From 336c22d581fa9e892c47a6a5158ee4ee5fc1c0f8 Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 24 Nov 2014 09:46:28 -0800 Subject: [PATCH] HBASE-12471 Task 4. replace internal ConnectionManager#{delete,get}Connection use with #close, #createConnection (0.98, 0.99) under src/main/java Move from HConnection to ClusterConnection or Connection Use unmanaged connections where we use managed previous (used the jdk7 https://docs.oracle.com/javase/7/docs/technotes/guides/language/try-with-resources.html idiom). In ZKConfig, synchronize on Configuration rather than make a copy. Making a copy we were dropping hbase configs in certain test context (could not find the zk ensemble because default port). In tests, some move to the new style connection setup but mostly fixes for premature connection close or adding cleanup where it was lacking. --- .../org/apache/hadoop/hbase/client/Admin.java | 4 +- .../hadoop/hbase/client/AsyncProcess.java | 22 +-- .../hbase/client/ConnectionManager.java | 2 +- .../hadoop/hbase/client/ConnectionUtils.java | 4 +- .../hadoop/hbase/client/HBaseAdmin.java | 71 +++---- .../apache/hadoop/hbase/client/MetaCache.java | 1 + .../hadoop/hbase/client/RegionLocator.java | 2 +- .../hadoop/hbase/zookeeper/ZKConfig.java | 21 +- .../hadoop/hbase/DistributedHBaseCluster.java | 42 ++-- .../hadoop/hbase/mapreduce/TableMapper.java | 2 +- .../hbase/mapreduce/TableOutputFormat.java | 1 - .../master/RegionPlacementMaintainer.java | 45 ++--- .../hadoop/hbase/master/ServerManager.java | 10 +- .../hbase/regionserver/HRegionServer.java | 3 +- .../hbase/regionserver/SplitLogWorker.java | 4 - .../org/apache/hadoop/hbase/HBaseCluster.java | 7 +- .../apache/hadoop/hbase/MiniHBaseCluster.java | 5 +- .../hadoop/hbase/client/TestAdmin1.java | 1 - .../hbase/client/TestHBaseAdminNoCluster.java | 14 +- .../apache/hadoop/hbase/client/TestHCM.java | 181 +++++++++--------- .../hbase/client/TestMultiParallel.java | 60 +++--- ...estLoadIncrementalHFilesSplitRecovery.java | 86 ++++++--- .../master/TestDistributedLogSplitting.java | 7 +- .../regionserver/TestScannerWithBulkload.java | 3 +- .../TestSplitTransactionOnCluster.java | 6 +- .../TestMultiSlaveReplication.java | 3 +- .../replication/TestReplicationBase.java | 37 ++-- ...tReplicationChangingPeerRegionservers.java | 2 +- .../replication/TestReplicationEndpoint.java | 64 +++++-- .../replication/TestReplicationKillRS.java | 3 +- .../TestReplicationSmallTests.java | 6 +- .../access/TestAccessController2.java | 2 +- 32 files changed, 387 insertions(+), 334 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index b04faac294a..a0ce1795b58 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -595,7 +595,7 @@ public interface Admin extends Abortable, Closeable { /** * Compact all regions on the region server - * @param regionserver the region server name + * @param sn the region server name * @param major if it's major compaction * @throws IOException * @throws InterruptedException @@ -1288,7 +1288,7 @@ public interface Admin extends Abortable, Closeable { * @return A RegionServerCoprocessorRpcChannel instance */ CoprocessorRpcChannel coprocessorService(ServerName sn); - + /** * Update the configuration and trigger an online config change diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 3f55b0e9a9d..e0c14a6460d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -163,7 +163,7 @@ class AsyncProcess { // TODO: many of the fields should be made private protected final long id; - protected final ClusterConnection hConnection; + protected final ClusterConnection connection; protected final RpcRetryingCallerFactory rpcCallerFactory; protected final RpcControllerFactory rpcFactory; protected final BatchErrors globalErrors; @@ -246,7 +246,7 @@ class AsyncProcess { throw new IllegalArgumentException("HConnection cannot be null."); } - this.hConnection = hc; + this.connection = hc; this.pool = pool; this.globalErrors = useGlobalErrors ? new BatchErrors() : null; @@ -340,7 +340,7 @@ class AsyncProcess { new HashMap>(); List> retainedActions = new ArrayList>(rows.size()); - NonceGenerator ng = this.hConnection.getNonceGenerator(); + NonceGenerator ng = this.connection.getNonceGenerator(); long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. // Location errors that happen before we decide what requests to take. @@ -363,7 +363,7 @@ class AsyncProcess { try { if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); // Make sure we get 0-s replica. - RegionLocations locs = hConnection.locateRegion( + RegionLocations locs = connection.locateRegion( tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { throw new IOException("#" + id + ", no location found, aborting submit for" @@ -535,7 +535,7 @@ class AsyncProcess { // The position will be used by the processBatch to match the object array returned. int posInList = -1; - NonceGenerator ng = this.hConnection.getNonceGenerator(); + NonceGenerator ng = this.connection.getNonceGenerator(); for (Row r : rows) { posInList++; if (r instanceof Put) { @@ -910,7 +910,7 @@ class AsyncProcess { ", row cannot be null"); RegionLocations loc = null; try { - loc = hConnection.locateRegion( + loc = connection.locateRegion( tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); } catch (IOException ex) { manageLocationError(action, ex); @@ -1025,7 +1025,7 @@ class AsyncProcess { if (tableName == null) { // tableName is null when we made a cross-table RPC call. - hConnection.clearCaches(server); + connection.clearCaches(server); } int failed = 0, stopped = 0; List> toReplay = new ArrayList>(); @@ -1036,7 +1036,7 @@ class AsyncProcess { // any of the regions in the MultiAction. // TODO: depending on type of exception we might not want to update cache at all? if (tableName != null) { - hConnection.updateCachedLocations(tableName, regionName, row, null, server); + connection.updateCachedLocations(tableName, regionName, row, null, server); } for (Action action : e.getValue()) { Retry retry = manageError( @@ -1150,7 +1150,7 @@ class AsyncProcess { // Register corresponding failures once per server/once per region. if (!regionFailureRegistered) { regionFailureRegistered = true; - hConnection.updateCachedLocations( + connection.updateCachedLocations( tableName, regionName, row.getRow(), result, server); } if (failureCount == 0) { @@ -1199,7 +1199,7 @@ class AsyncProcess { errorsByServer.reportServerError(server); canRetry = errorsByServer.canRetryMore(numAttempt); } - hConnection.updateCachedLocations( + connection.updateCachedLocations( tableName, region, actions.get(0).getAction().getRow(), throwable, server); failureCount += actions.size(); @@ -1514,7 +1514,7 @@ class AsyncProcess { @VisibleForTesting protected MultiServerCallable createCallable(final ServerName server, TableName tableName, final MultiAction multi) { - return new MultiServerCallable(hConnection, tableName, server, this.rpcFactory, multi); + return new MultiServerCallable(connection, tableName, server, this.rpcFactory, multi); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index bfdf5d2a20d..147a203dceb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -249,7 +249,7 @@ class ConnectionManager { */ @VisibleForTesting static NonceGenerator injectNonceGeneratorForTesting( - HConnection conn, NonceGenerator cnm) { + ClusterConnection conn, NonceGenerator cnm) { HConnectionImplementation connImpl = (HConnectionImplementation)conn; NonceGenerator ng = connImpl.getNonceGenerator(); LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index e26ae4826e8..72b447aff3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -79,7 +79,7 @@ public class ConnectionUtils { * @return old nonce generator. */ public static NonceGenerator injectNonceGeneratorForTesting( - HConnection conn, NonceGenerator cnm) { + ClusterConnection conn, NonceGenerator cnm) { return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm); } @@ -111,7 +111,7 @@ public class ConnectionUtils { * @param client the client interface of the local server * @return an adapted/decorated HConnection */ - public static HConnection createShortCircuitHConnection(final HConnection conn, + public static HConnection createShortCircuitHConnection(final Connection conn, final ServerName serverName, final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) { return new ConnectionAdapter(conn) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 329a3738b55..109352b4906 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription; @@ -175,8 +174,6 @@ public class HBaseAdmin implements Admin { private static final String ZK_IDENTIFIER_PREFIX = "hbase-admin-on-"; - // We use the implementation class rather then the interface because we - // need the package protected functions to get the connection to master private ClusterConnection connection; private volatile Configuration conf; @@ -1443,8 +1440,7 @@ public class HBaseAdmin implements Admin { * Get all the online regions on a region server. */ @Override - public List getOnlineRegions( - final ServerName sn) throws IOException { + public List getOnlineRegions(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); return ProtobufUtil.getOnlineRegions(admin); } @@ -2333,12 +2329,6 @@ public class HBaseAdmin implements Admin { }); } - private HRegionLocation getFirstMetaServerForTable(final TableName tableName) - throws IOException { - return connection.locateRegion(TableName.META_TABLE_NAME, - HRegionInfo.createRegionName(tableName, null, HConstants.NINES, false)); - } - /** * @return Configuration used by the instance. */ @@ -2495,52 +2485,40 @@ public class HBaseAdmin implements Admin { /** * Check to see if HBase is running. Throw an exception if not. - * We consider that HBase is running if ZooKeeper and Master are running. - * * @param conf system configuration * @throws MasterNotRunningException if the master is not running * @throws ZooKeeperConnectionException if unable to connect to zookeeper */ + // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not. public static void checkHBaseAvailable(Configuration conf) - throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { + throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { Configuration copyOfConf = HBaseConfiguration.create(conf); - // We set it to make it fail as soon as possible if HBase is not available copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); copyOfConf.setInt("zookeeper.recovery.retry", 0); - - ConnectionManager.HConnectionImplementation connection - = (ConnectionManager.HConnectionImplementation) - HConnectionManager.getConnection(copyOfConf); - - try { - // Check ZK first. - // If the connection exists, we may have a connection to ZK that does - // not work anymore - ZooKeeperKeepAliveConnection zkw = null; - try { - zkw = connection.getKeepAliveZooKeeperWatcher(); - zkw.getRecoverableZooKeeper().getZooKeeper().exists( - zkw.baseZNode, false); - - } catch (IOException e) { - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } catch (InterruptedException e) { - throw (InterruptedIOException) + try (ClusterConnection connection = + (ClusterConnection)ConnectionFactory.createConnection(copyOfConf)) { + // Check ZK first. + // If the connection exists, we may have a connection to ZK that does not work anymore + ZooKeeperKeepAliveConnection zkw = null; + try { + // This is NASTY. FIX!!!! Dependent on internal implementation! TODO + zkw = ((ConnectionManager.HConnectionImplementation)connection). + getKeepAliveZooKeeperWatcher(); + zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false); + } catch (IOException e) { + throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException("Can't connect to ZooKeeper").initCause(e); - } catch (KeeperException e) { - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } finally { - if (zkw != null) { - zkw.close(); + } catch (KeeperException e) { + throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); + } finally { + if (zkw != null) { + zkw.close(); + } } - } - - // Check Master connection.isMasterRunning(); - - } finally { - connection.close(); } } @@ -3779,8 +3757,9 @@ public class HBaseAdmin implements Admin { @Override public int getMasterInfoPort() throws IOException { + // TODO: Fix! Reaching into internal implementation!!!! ConnectionManager.HConnectionImplementation connection = - (ConnectionManager.HConnectionImplementation) HConnectionManager.getConnection(conf); + (ConnectionManager.HConnectionImplementation)this.connection; ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); try { return MasterAddressTracker.getMasterInfoPort(zkw); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 4ecddb59b66..a49f95c3c62 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -63,6 +63,7 @@ public class MetaCache { * Search the cache for a location that fits our table and row key. * Return null if no suitable region is located. * + * * @param tableName * @param row * @return Null or region location found in cache. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java index 8168fe1033e..39518a619c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.util.Pair; /** * Used to view region location information for a single HBase table. - * Obtain an instance from an {@link HConnection}. + * Obtain an instance from an {@link Connection}. * * @see ConnectionFactory * @see Connection diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java index 75b0175440f..3dc9aa63f14 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -84,16 +84,19 @@ public class ZKConfig { Properties zkProperties = new Properties(); // Directly map all of the hbase.zookeeper.property.KEY properties. - for (Entry entry : new Configuration(conf)) { // copy for mt safety - String key = entry.getKey(); - if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { - String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); - String value = entry.getValue(); - // If the value has variables substitutions, need to do a get. - if (value.contains(VARIABLE_START)) { - value = conf.get(key); + // Synchronize on conf so no loading of configs while we iterate + synchronized (conf) { + for (Entry entry : conf) { + String key = entry.getKey(); + if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { + String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); + String value = entry.getValue(); + // If the value has variables substitutions, need to do a get. + if (value.contains(VARIABLE_START)) { + value = conf.get(key); + } + zkProperties.put(zkKey, value); } - zkProperties.put(zkKey, value); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index d97862d6532..6bc4143175a 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -25,9 +25,11 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterManager.ServiceType; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; @@ -44,8 +46,8 @@ import com.google.common.collect.Sets; */ @InterfaceAudience.Private public class DistributedHBaseCluster extends HBaseCluster { - - private HBaseAdmin admin; + private Admin admin; + private final Connection connection; private ClusterManager clusterManager; @@ -53,7 +55,8 @@ public class DistributedHBaseCluster extends HBaseCluster { throws IOException { super(conf); this.clusterManager = clusterManager; - this.admin = new HBaseAdmin(conf); + this.connection = ConnectionFactory.createConnection(conf); + this.admin = this.connection.getAdmin(); this.initialClusterStatus = getClusterStatus(); } @@ -84,18 +87,21 @@ public class DistributedHBaseCluster extends HBaseCluster { if (this.admin != null) { admin.close(); } + if (this.connection != null && !this.connection.isClosed()) { + this.connection.close(); + } } @Override public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException { - return admin.getConnection().getAdmin(serverName); + return ((ClusterConnection)this.connection).getAdmin(serverName); } @Override public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName) throws IOException { - return admin.getConnection().getClient(serverName); + return ((ClusterConnection)this.connection).getClient(serverName); } @Override @@ -138,8 +144,7 @@ public class DistributedHBaseCluster extends HBaseCluster { @Override public MasterService.BlockingInterface getMasterAdminService() throws IOException { - HConnection conn = HConnectionManager.getConnection(conf); - return conn.getMaster(); + return ((ClusterConnection)this.connection).getMaster(); } @Override @@ -183,18 +188,19 @@ public class DistributedHBaseCluster extends HBaseCluster { } @Override - public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { - HConnection connection = admin.getConnection(); - HRegionLocation regionLoc = connection.locateRegion(regionName); + public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { + HRegionLocation regionLoc = null; + try (RegionLocator locator = connection.getRegionLocator(tn)) { + regionLoc = locator.getRegionLocation(regionName); + } if (regionLoc == null) { - LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) - + " for table " + HRegionInfo.getTableName(regionName) + ", start key [" + - Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]"); + LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) + + ", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]"); return null; } AdminProtos.AdminService.BlockingInterface client = - connection.getAdmin(regionLoc.getServerName()); + ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName()); ServerInfo info = ProtobufUtil.getServerInfo(client); return ProtobufUtil.toServerName(info.getServerName()); } @@ -374,7 +380,7 @@ public class DistributedHBaseCluster extends HBaseCluster { } catch (IOException ioe) { LOG.warn("While closing the old connection", ioe); } - this.admin = new HBaseAdmin(conf); + this.admin = this.connection.getAdmin(); LOG.info("Added new HBaseAdmin"); return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java index 0d9b65a8f23..cde94fee272 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java @@ -37,4 +37,4 @@ import org.apache.hadoop.mapreduce.Mapper; public abstract class TableMapper extends Mapper { -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index c7c663905b7..84f68912e29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; * while the output value must be either a {@link Put} or a * {@link Delete} instance. * - *

is the type of the key. Ignored in this class. */ @InterfaceAudience.Public @InterfaceStability.Stable diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java index 1d16bdf5b1f..e7d52a29afb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java @@ -48,9 +48,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -67,9 +67,9 @@ import org.apache.log4j.Logger; /** * A tool that is used for manipulating and viewing favored nodes information * for regions. Run with -h to get a list of the options - * */ @InterfaceAudience.Private +// TODO: Remove? Unused. Partially implemented only. public class RegionPlacementMaintainer { private static final Log LOG = LogFactory.getLog(RegionPlacementMaintainer.class .getName()); @@ -93,9 +93,9 @@ public class RegionPlacementMaintainer { private Configuration conf; private final boolean enforceLocality; private final boolean enforceMinAssignmentMove; - private HBaseAdmin admin; private RackManager rackManager; private Set targetTableSet; + private final Connection connection; public RegionPlacementMaintainer(Configuration conf) { this(conf, true, true); @@ -108,7 +108,13 @@ public class RegionPlacementMaintainer { this.enforceMinAssignmentMove = enforceMinAssignmentMove; this.targetTableSet = new HashSet(); this.rackManager = new RackManager(conf); + try { + this.connection = ConnectionFactory.createConnection(this.conf); + } catch (IOException e) { + throw new RuntimeException(e); + } } + private static void printHelp(Options opt) { new HelpFormatter().printHelp( "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " + @@ -124,17 +130,6 @@ public class RegionPlacementMaintainer { } } - /** - * @return the cached HBaseAdmin - * @throws IOException - */ - private HBaseAdmin getHBaseAdmin() throws IOException { - if (this.admin == null) { - this.admin = new HBaseAdmin(this.conf); - } - return this.admin; - } - /** * @return the new RegionAssignmentSnapshot * @throws IOException @@ -142,7 +137,7 @@ public class RegionPlacementMaintainer { public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException { SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = - new SnapshotOfRegionAssignmentFromMeta(HConnectionManager.getConnection(conf)); + new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); currentAssignmentShapshot.initialize(); return currentAssignmentShapshot; } @@ -210,8 +205,10 @@ public class RegionPlacementMaintainer { // Get the all the region servers List servers = new ArrayList(); - servers.addAll(getHBaseAdmin().getClusterStatus().getServers()); - + try (Admin admin = this.connection.getAdmin()) { + servers.addAll(admin.getClusterStatus().getServers()); + } + LOG.info("Start to generate assignment plan for " + numRegions + " regions from table " + tableName + " with " + servers.size() + " region servers"); @@ -619,9 +616,9 @@ public class RegionPlacementMaintainer { // sort the map based on region info Map> assignmentMap = new TreeMap>(plan.getAssignmentMap()); - + for (Map.Entry> entry : assignmentMap.entrySet()) { - + String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue()); String regionName = entry.getKey().getRegionNameAsString(); LOG.info("Region: " + regionName ); @@ -660,7 +657,6 @@ public class RegionPlacementMaintainer { // Get the region to region server map Map> currentAssignment = this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); - HConnection connection = this.getHBaseAdmin().getConnection(); // track of the failed and succeeded updates int succeededNum = 0; @@ -691,10 +687,11 @@ public class RegionPlacementMaintainer { } if (singleServerPlan != null) { // Update the current region server with its updated favored nodes - BlockingInterface currentRegionServer = connection.getAdmin(entry.getKey()); + BlockingInterface currentRegionServer = + ((ClusterConnection)this.connection).getAdmin(entry.getKey()); UpdateFavoredNodesRequest request = RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); - + UpdateFavoredNodesResponse updateFavoredNodesResponse = currentRegionServer.updateFavoredNodes(null, request); LOG.info("Region server " + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index ff0706b77d1..39d0a0f00ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -36,7 +36,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.HRegionInfo; @@ -46,8 +45,9 @@ import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler; @@ -137,7 +137,7 @@ public class ServerManager { private final Server master; private final MasterServices services; - private final HConnection connection; + private final ClusterConnection connection; private final DeadServer deadservers = new DeadServer(); @@ -201,7 +201,7 @@ public class ServerManager { Configuration c = master.getConfiguration(); maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); - this.connection = connect ? HConnectionManager.getConnection(c) : null; + this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ed04a8639ee..464ad7eaea2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -610,7 +611,7 @@ public class HRegionServer extends HasThread implements */ protected HConnection createShortCircuitConnection() throws IOException { return ConnectionUtils.createShortCircuitHConnection( - HConnectionManager.getConnection(conf), serverName, rpcServices, rpcServices); + ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 8bdf12af91d..a182aa1a8d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; @@ -135,9 +134,6 @@ public class SplitLogWorker implements Runnable { try { LOG.info("SplitLogWorker " + server.getServerName() + " starting"); coordination.registerListener(); - // pre-initialize a new connection for splitlogworker configuration - HConnectionManager.getConnection(conf); - // wait for Coordination Engine is ready boolean res = false; while (!res && !coordination.isStop()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java index 76a9566d808..9e7a0c477be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java @@ -250,15 +250,18 @@ public abstract class HBaseCluster implements Closeable, Configurable { * Get the ServerName of region server serving the first hbase:meta region */ public ServerName getServerHoldingMeta() throws IOException { - return getServerHoldingRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + return getServerHoldingRegion(TableName.META_TABLE_NAME, + HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); } /** * Get the ServerName of region server serving the specified region * @param regionName Name of the region in bytes + * @param tn Table name that has the region. * @return ServerName that hosts the region or null */ - public abstract ServerName getServerHoldingRegion(byte[] regionName) throws IOException; + public abstract ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) + throws IOException; /** * @return whether we are interacting with a distributed cluster as opposed to an diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index a3ec0045803..7672ac1b662 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -517,7 +516,6 @@ public class MiniHBaseCluster extends HBaseCluster { if (this.hbaseCluster != null) { this.hbaseCluster.shutdown(); } - HConnectionManager.deleteAllConnections(false); } @Override @@ -657,7 +655,8 @@ public class MiniHBaseCluster extends HBaseCluster { } @Override - public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { + public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) + throws IOException { // Assume there is only one master thread which is the active master. // If there are multiple master threads, the backup master threads // should hold some regions. Please refer to #countServedRegions diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index 3d3b499e640..131adb60416 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -882,7 +882,6 @@ public class TestAdmin1 { admin.createTable(desc, splitKeys); HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); Map regions = ht.getRegionLocations(); - ht.close(); assertEquals("Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), expectedRegions, regions.size()); // Disable table. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index 7521bb7df09..73fffe34a44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -52,7 +52,7 @@ public class TestHBaseAdminNoCluster { * @throws ServiceException */ @Test - public void testMasterMonitorCollableRetries() + public void testMasterMonitorCallableRetries() throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException { Configuration configuration = HBaseConfiguration.create(); // Set the pause and retry count way down. @@ -61,20 +61,18 @@ public class TestHBaseAdminNoCluster { configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count); // Get mocked connection. Getting the connection will register it so when HBaseAdmin is // constructed with same configuration, it will find this mocked connection. - HConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); + ClusterConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); // Mock so we get back the master interface. Make it so when createTable is called, we throw // the PleaseHoldException. - MasterKeepAliveConnection masterAdmin = - Mockito.mock(MasterKeepAliveConnection.class); + MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class); Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(), (CreateTableRequest)Mockito.any())). thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test"))); Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin); - // Mock up our admin Interfaces - Admin admin = new HBaseAdmin(configuration); + Admin admin = new HBaseAdmin(connection); try { HTableDescriptor htd = - new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries")); + new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries")); // Pass any old htable descriptor; not important try { admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); @@ -87,7 +85,7 @@ public class TestHBaseAdminNoCluster { (CreateTableRequest)Mockito.any()); } finally { admin.close(); - if (connection != null)HConnectionManager.deleteConnection(configuration); + if (connection != null) connection.close(); } } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 5a6065c2ec8..e8f668bafa0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -132,6 +132,8 @@ public class TestHCM { @BeforeClass public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, + HConstants.STATUS_PUBLISHED_DEFAULT); if (isJavaOk) { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); } @@ -1010,117 +1012,114 @@ public class TestHCM { @Test (timeout=30000) public void testMulti() throws Exception { HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM); - TEST_UTIL.createMultiRegions(table, FAM_NAM); - ConnectionManager.HConnectionImplementation conn = - (ConnectionManager.HConnectionImplementation) - HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); + try { + TEST_UTIL.createMultiRegions(table, FAM_NAM); + ConnectionManager.HConnectionImplementation conn = + ( ConnectionManager.HConnectionImplementation)table.getConnection(); - // We're now going to move the region and check that it works for the client - // First a new put to add the location in the cache - conn.clearRegionCache(TABLE_NAME3); - Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); + // We're now going to move the region and check that it works for the client + // First a new put to add the location in the cache + conn.clearRegionCache(TABLE_NAME3); + Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); - TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); - HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - // We can wait for all regions to be online, that makes log reading easier when debugging - TEST_UTIL.waitUntilNoRegionsInTransition(20000); + // We can wait for all regions to be online, that makes log reading easier when debugging + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Thread.sleep(1); + } - Put put = new Put(ROW_X); - put.add(FAM_NAM, ROW_X, ROW_X); - table.put(put); + Put put = new Put(ROW_X); + put.add(FAM_NAM, ROW_X, ROW_X); + table.put(put); - // Now moving the region to the second server - HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); - if (toMove == null) { - String msg = "Failed to find location for " + Bytes.toString(ROW_X) + " in " + TABLE_NAME3; - // Log so easier to see in output where error occurred. - LOG.error(msg); - throw new NullPointerException(msg); - } - byte[] regionName = toMove.getRegionInfo().getRegionName(); - byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); + // Now moving the region to the second server + HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); + byte[] regionName = toMove.getRegionInfo().getRegionName(); + byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); - // Choose the other server. - int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); - int destServerId = (curServerId == 0 ? 1 : 0); + // Choose the other server. + int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); + int destServerId = (curServerId == 0 ? 1 : 0); - HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); - HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); + HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); + HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); - ServerName destServerName = destServer.getServerName(); + ServerName destServerName = destServer.getServerName(); - //find another row in the cur server that is less than ROW_X - List regions = curServer.getOnlineRegions(TABLE_NAME3); - byte[] otherRow = null; - for (HRegion region : regions) { - if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) - && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { - otherRow = region.getRegionInfo().getStartKey(); - break; - } - } - assertNotNull(otherRow); - // If empty row, set it to first row.-f - if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); - Put put2 = new Put(otherRow); - put2.add(FAM_NAM, otherRow, otherRow); - table.put(put2); //cache put2's location + //find another row in the cur server that is less than ROW_X + List regions = curServer.getOnlineRegions(TABLE_NAME3); + byte[] otherRow = null; + for (HRegion region : regions) { + if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) + && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { + otherRow = region.getRegionInfo().getStartKey(); + break; + } + } + assertNotNull(otherRow); + // If empty row, set it to first row.-f + if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); + Put put2 = new Put(otherRow); + put2.add(FAM_NAM, otherRow, otherRow); + table.put(put2); //cache put2's location - // Check that we are in the expected state - Assert.assertTrue(curServer != destServer); - Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); - Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); - Assert.assertNotNull(curServer.getOnlineRegion(regionName)); - Assert.assertNull(destServer.getOnlineRegion(regionName)); - Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). - getAssignmentManager().getRegionStates().isRegionsInTransition()); + // Check that we are in the expected state + Assert.assertTrue(curServer != destServer); + Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); + Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); + Assert.assertNotNull(curServer.getOnlineRegion(regionName)); + Assert.assertNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().getRegionStates().isRegionsInTransition()); - // Moving. It's possible that we don't have all the regions online at this point, so - // the test must depends only on the region we're looking at. - LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); - TEST_UTIL.getHBaseAdmin().move( - toMove.getRegionInfo().getEncodedNameAsBytes(), - destServerName.getServerName().getBytes() - ); + // Moving. It's possible that we don't have all the regions online at this point, so + // the test must depends only on the region we're looking at. + LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); + TEST_UTIL.getHBaseAdmin().move( + toMove.getRegionInfo().getEncodedNameAsBytes(), + destServerName.getServerName().getBytes() + ); - while (destServer.getOnlineRegion(regionName) == null || - destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || - curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || - master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { - // wait for the move to be finished - Thread.sleep(1); - } + while (destServer.getOnlineRegion(regionName) == null || + destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + // wait for the move to be finished + Thread.sleep(1); + } - LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); + LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); - // Check our new state. - Assert.assertNull(curServer.getOnlineRegion(regionName)); - Assert.assertNotNull(destServer.getOnlineRegion(regionName)); - Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); - Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + // Check our new state. + Assert.assertNull(curServer.getOnlineRegion(regionName)); + Assert.assertNotNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); - // Cache was NOT updated and points to the wrong server - Assert.assertFalse( - conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() - .getPort() == destServerName.getPort()); + // Cache was NOT updated and points to the wrong server + Assert.assertFalse( + conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() + .getPort() == destServerName.getPort()); - // Hijack the number of retry to fail after 2 tries - final int prevNumRetriesVal = setNumTries(conn, 2); + // Hijack the number of retry to fail after 2 tries + final int prevNumRetriesVal = setNumTries(conn, 2); - Put put3 = new Put(ROW_X); - put3.add(FAM_NAM, ROW_X, ROW_X); - Put put4 = new Put(otherRow); - put4.add(FAM_NAM, otherRow, otherRow); + Put put3 = new Put(ROW_X); + put3.add(FAM_NAM, ROW_X, ROW_X); + Put put4 = new Put(otherRow); + put4.add(FAM_NAM, otherRow, otherRow); - // do multi - table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, - // second we get RegionMovedException. + // do multi + table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, + // second we get RegionMovedException. - setNumTries(conn, prevNumRetriesVal); - table.close(); - conn.close(); + setNumTries(conn, prevNumRetriesVal); + } finally { + table.close(); + } } @Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 388b3c9241e..69267ec77fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -33,7 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -42,14 +40,11 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.exceptions.OperationConflictException; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; -import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -71,6 +66,7 @@ public class TestMultiParallel { private static final byte [][] KEYS = makeKeys(); private static final int slaves = 5; // also used for testing HTable pool size + private static Connection CONNECTION; @BeforeClass public static void beforeClass() throws Exception { // Uncomment the following lines if more verbosity is needed for @@ -83,9 +79,11 @@ public class TestMultiParallel { UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY)); UTIL.waitTableEnabled(TEST_TABLE); t.close(); + CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); } @AfterClass public static void afterClass() throws Exception { + CONNECTION.close(); UTIL.shutdownMiniCluster(); } @@ -98,9 +96,6 @@ public class TestMultiParallel { // Wait until completing balance UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); } - HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration()); - conn.clearRegionCache(); - conn.close(); LOG.info("before done"); } @@ -150,20 +145,26 @@ public class TestMultiParallel { * @throws SecurityException */ @Test(timeout=300000) - public void testActiveThreadsCount() throws Exception{ - Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); - List puts = constructPutRequests(); // creates a Put for every region - table.batch(puts); - HashSet regionservers = new HashSet(); - for (byte[] k : KEYS) { - HRegionLocation location = ((HTable)table).getRegionLocation(k); - regionservers.add(location.getServerName()); + public void testActiveThreadsCount() throws Exception { + try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) { + ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); + try { + try (Table t = connection.getTable(TEST_TABLE, executor)) { + List puts = constructPutRequests(); // creates a Put for every region + t.batch(puts); + HashSet regionservers = new HashSet(); + try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { + for (Row r : puts) { + HRegionLocation location = locator.getRegionLocation(r.getRow()); + regionservers.add(location.getServerName()); + } + } + assertEquals(regionservers.size(), executor.getLargestPoolSize()); + } + } finally { + executor.shutdownNow(); + } } - Field poolField = table.getClass().getDeclaredField("pool"); - poolField.setAccessible(true); - ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table); - assertEquals(regionservers.size(), tExecutor.getLargestPoolSize()); - table.close(); } @Test(timeout=300000) @@ -198,7 +199,7 @@ public class TestMultiParallel { Cell[] multiKvs = multiRes[i].rawCells(); for (int j = 0; j < singleKvs.length; j++) { Assert.assertEquals(singleKvs[j], multiKvs[j]); - Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), + Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j]))); } } @@ -330,7 +331,7 @@ public class TestMultiParallel { @Test (timeout=300000) public void testBatchWithPut() throws Exception { LOG.info("test=testBatchWithPut"); - Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + Table table = CONNECTION.getTable(TEST_TABLE); // put multiple rows using a batch List puts = constructPutRequests(); @@ -348,9 +349,8 @@ public class TestMultiParallel { results = table.batch(puts); } catch (RetriesExhaustedWithDetailsException ree) { LOG.info(ree.getExhaustiveDescription()); - throw ree; - } finally { table.close(); + throw ree; } validateSizeAndEmpty(results, KEYS.length); } @@ -491,7 +491,8 @@ public class TestMultiParallel { @Test(timeout=300000) public void testNonceCollision() throws Exception { LOG.info("test=testNonceCollision"); - HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Table table = connection.getTable(TEST_TABLE); Put put = new Put(ONE_ROW); put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); @@ -510,8 +511,9 @@ public class TestMultiParallel { return nonce; } }; + NonceGenerator oldCnm = - ConnectionUtils.injectNonceGeneratorForTesting(table.getConnection(), cnm); + ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm); // First test sequential requests. try { @@ -541,7 +543,7 @@ public class TestMultiParallel { public void run() { Table table = null; try { - table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + table = connection.getTable(TEST_TABLE); } catch (IOException e) { fail("Not expected"); } @@ -574,7 +576,7 @@ public class TestMultiParallel { validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L)); table.close(); } finally { - ConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm); + ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 47501d8c857..323d0e9aa44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; @@ -232,10 +234,10 @@ public class TestLoadIncrementalHFilesSplitRecovery { * @throws IOException */ void assertExpectedTable(TableName table, int count, int value) throws IOException { + HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString()); + assertEquals(htds.length, 1); Table t = null; try { - assertEquals( - util.getHBaseAdmin().listTables(table.getNameAsString()).length, 1); t = new HTable(util.getConfiguration(), table); Scan s = new Scan(); ResultScanner sr = t.getScanner(s); @@ -444,30 +446,33 @@ public class TestLoadIncrementalHFilesSplitRecovery { public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta"); byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; - HTable table = new HTable(util.getConfiguration(), tableName); + // Share connection. We were failing to find the table with our new reverse scan because it + // looks for first region, not any region -- that is how it works now. The below removes first + // region in test. Was reliant on the Connection caching having first region. + Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); + Table table = connection.getTable(tableName); setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); Path dir = buildBulkFiles(tableName, 2); final AtomicInteger countedLqis = new AtomicInteger(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles( - util.getConfiguration()) { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { - protected List groupOrSplit( - Multimap regionGroups, - final LoadQueueItem item, final HTable htable, - final Pair startEndKeys) throws IOException { - List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); - if (lqis != null) { - countedLqis.addAndGet(lqis.size()); + protected List groupOrSplit( + Multimap regionGroups, + final LoadQueueItem item, final HTable htable, + final Pair startEndKeys) throws IOException { + List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); + if (lqis != null) { + countedLqis.addAndGet(lqis.size()); + } + return lqis; } - return lqis; - } - }; + }; // do bulkload when there is no region hole in hbase:meta. try { - loader.doBulkLoad(dir, table); + loader.doBulkLoad(dir, (HTable)table); } catch (Exception e) { LOG.error("exeception=", e); } @@ -477,18 +482,16 @@ public class TestLoadIncrementalHFilesSplitRecovery { dir = buildBulkFiles(tableName, 3); // Mess it up by leaving a hole in the hbase:meta - HConnection hConnection = HConnectionManager.getConnection(util.getConfiguration()); - List regionInfos = MetaTableAccessor.getTableRegions( - hConnection, tableName); + List regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); for (HRegionInfo regionInfo : regionInfos) { if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { - MetaTableAccessor.deleteRegion(hConnection, regionInfo); + MetaTableAccessor.deleteRegion(connection, regionInfo); break; } } try { - loader.doBulkLoad(dir, table); + loader.doBulkLoad(dir, (HTable)table); } catch (Exception e) { LOG.error("exeception=", e); assertTrue("IOException expected", e instanceof IOException); @@ -496,7 +499,42 @@ public class TestLoadIncrementalHFilesSplitRecovery { table.close(); - this.assertExpectedTable(tableName, ROWCOUNT, 2); - } -} + // Make sure at least the one region that still exists can be found. + regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); + assertTrue(regionInfos.size() >= 1); + this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); + connection.close(); + } + + /** + * Checks that all columns have the expected value and that there is the + * expected number of rows. + * @throws IOException + */ + void assertExpectedTable(final Connection connection, TableName table, int count, int value) + throws IOException { + HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString()); + assertEquals(htds.length, 1); + Table t = null; + try { + t = connection.getTable(table); + Scan s = new Scan(); + ResultScanner sr = t.getScanner(s); + int i = 0; + for (Result r : sr) { + i++; + for (NavigableMap nm : r.getNoVersionMap().values()) { + for (byte[] val : nm.values()) { + assertTrue(Bytes.equals(val, value(value))); + } + } + } + assertEquals(count, i); + } catch (IOException e) { + fail("Failed due to exception"); + } finally { + if (t != null) t.close(); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 366a36b3cc2..a16d7e5888d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -107,8 +108,6 @@ import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -337,7 +336,7 @@ public class TestDistributedLogSplitting { HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE); NonceGeneratorWithDups ng = new NonceGeneratorWithDups(); NonceGenerator oldNg = - ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), ng); + ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)ht.getConnection(), ng); try { List reqs = new ArrayList(); @@ -371,7 +370,7 @@ public class TestDistributedLogSplitting { } } } finally { - ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), oldNg); + ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) ht.getConnection(), oldNg); ht.close(); zkw.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 5593d802815..49ded212120 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -179,7 +179,6 @@ public class TestScannerWithBulkload { table.put(put1); table.flushCommits(); admin.flush(tableName); - admin.close(); put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version1"))); @@ -200,7 +199,7 @@ public class TestScannerWithBulkload { @Test public void testBulkLoadWithParallelScan() throws Exception { TableName tableName = TableName.valueOf("testBulkLoadWithParallelScan"); - final long l = System.currentTimeMillis(); + final long l = System.currentTimeMillis(); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); createTable(admin, tableName); Scan scan = createScan(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 29073edcf89..678adc45fe4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -107,8 +107,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; /** - * Like {@link TestSplitTransaction} in that we're testing {@link SplitTransaction} - * only the below tests are against a running cluster where {@link TestSplitTransaction} + * Like TestSplitTransaction in that we're testing {@link SplitTransaction} + * only the below tests are against a running cluster where TestSplitTransaction * is tests against a bare {@link HRegion}. */ @Category({RegionServerTests.class, LargeTests.class}) @@ -904,7 +904,7 @@ public class TestSplitTransactionOnCluster { fail("Each table should have at least one region."); } ServerName serverName = - cluster.getServerHoldingRegion(firstTableRegions.get(0).getRegionName()); + cluster.getServerHoldingRegion(firstTable, firstTableRegions.get(0).getRegionName()); admin.move(secondTableRegions.get(0).getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(serverName.getServerName())); Table table1 = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 7042724d3ea..f9aeb6f5cfb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -224,7 +224,8 @@ public class TestMultiSlaveReplication { region.getWAL().registerWALActionsListener(listener); // request a roll - admin.rollWALWriter(cluster.getServerHoldingRegion(region.getRegionName())); + admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(), + region.getRegionName())); // wait try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index de46f25a87a..ad9b227ff35 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -29,15 +28,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -48,10 +45,10 @@ import org.junit.BeforeClass; * All other tests should have their own classes and extend this one */ public class TestReplicationBase { - +/* { ((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL); - } + }*/ private static final Log LOG = LogFactory.getLog(TestReplicationBase.class); @@ -64,7 +61,7 @@ public class TestReplicationBase { protected static ReplicationAdmin admin; - protected static HTable htable1; + protected static Table htable1; protected static Table htable2; protected static HBaseTestingUtility utility1; @@ -138,15 +135,19 @@ public class TestReplicationBase { table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); - Admin admin1 = new HBaseAdmin(conf1); - Admin admin2 = new HBaseAdmin(conf2); - admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } utility1.waitUntilAllRegionsAssigned(tableName); - admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); utility2.waitUntilAllRegionsAssigned(tableName); - htable1 = new HTable(conf1, tableName); + htable1 = connection1.getTable(tableName); htable1.setWriteBufferSize(1024); - htable2 = new HTable(conf2, tableName); + htable2 = connection2.getTable(tableName); } /** @@ -154,10 +155,10 @@ public class TestReplicationBase { */ @AfterClass public static void tearDownAfterClass() throws Exception { + htable2.close(); + htable1.close(); + admin.close(); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); } - - -} - +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index 6dc354858a8..031960776e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -54,7 +54,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas */ @Before public void setUp() throws Exception { - htable1.setAutoFlush(false, true); + ((HTable)htable1).setAutoFlush(false, true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for (JVMClusterUtil.RegionServerThread r : diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 456a0867b94..633dcc98b28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -26,16 +26,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.junit.AfterClass; import org.junit.Assert; @@ -49,6 +54,7 @@ import org.junit.experimental.categories.Category; */ @Category({ReplicationTests.class, MediumTests.class}) public class TestReplicationEndpoint extends TestReplicationBase { + static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class); static int numRegionServers; @@ -72,13 +78,14 @@ public class TestReplicationEndpoint extends TestReplicationBase { ReplicationEndpointForTest.contructedCount.set(0); ReplicationEndpointForTest.startedCount.set(0); ReplicationEndpointForTest.replicateCount.set(0); + ReplicationEndpointReturningFalse.replicated.set(false); ReplicationEndpointForTest.lastEntries = null; for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) { utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName()); } } - @Test + @Test (timeout=120000) public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", @@ -117,17 +124,32 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testCustomReplicationEndpoint"); } - @Test + @Test (timeout=120000) public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { - admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate", + Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); + Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); + int peerCount = admin.getPeersCount(); + final String id = "testReplicationEndpointReturnsFalseOnReplicate"; + admin.addPeer(id, new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); - // now replicate some data. + // This test is flakey and then there is so much stuff flying around in here its, hard to + // debug. Peer needs to be up for the edit to make it across. This wait on + // peer count seems to be a hack that has us not progress till peer is up. + if (admin.getPeersCount() <= peerCount) { + LOG.info("Waiting on peercount to go up from " + peerCount); + Threads.sleep(100); + } + // now replicate some data doPut(row); Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { + // Looks like replication endpoint returns false unless we put more than 10 edits. We + // only send over one edit. + int count = ReplicationEndpointForTest.replicateCount.get(); + LOG.info("count=" + count); return ReplicationEndpointReturningFalse.replicated.get(); } }); @@ -138,15 +160,17 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); } - @Test + @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { admin.addPeer("testWALEntryFilterFromReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); // now replicate some data. - doPut(Bytes.toBytes("row1")); - doPut(row); - doPut(Bytes.toBytes("row2")); + try (Connection connection = ConnectionFactory.createConnection(conf1)) { + doPut(connection, Bytes.toBytes("row1")); + doPut(connection, row); + doPut(connection, Bytes.toBytes("row2")); + } Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { @Override @@ -161,11 +185,17 @@ public class TestReplicationEndpoint extends TestReplicationBase { private void doPut(byte[] row) throws IOException { - Put put = new Put(row); - put.add(famName, row, row); - htable1 = new HTable(conf1, tableName); - htable1.put(put); - htable1.close(); + try (Connection connection = ConnectionFactory.createConnection(conf1)) { + doPut(connection, row); + } + } + + private void doPut(final Connection connection, final byte [] row) throws IOException { + try (Table t = connection.getTable(tableName)) { + Put put = new Put(row); + put.add(famName, row, row); + t.put(put); + } } private static void doAssert(byte[] row) throws Exception { @@ -217,6 +247,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { + static int COUNT = 10; static AtomicReference ex = new AtomicReference(null); static AtomicBoolean replicated = new AtomicBoolean(false); @Override @@ -229,8 +260,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { } super.replicate(replicateContext); + LOG.info("Replicated " + row + ", count=" + replicateCount.get()); - replicated.set(replicateCount.get() > 10); // first 10 times, we return false + replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false return replicated.get(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java index 5739aee16af..6a6cf219176 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -54,7 +55,7 @@ public class TestReplicationKillRS extends TestReplicationBase { Thread killer = killARegionServer(util, 5000, rsToKill1); LOG.info("Start loading table"); - int initialCount = utility1.loadTable(htable1, famName); + int initialCount = utility1.loadTable((HTable)htable1, famName); LOG.info("Done loading table"); killer.join(5000); LOG.info("Done waiting for threads"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 3ecec9122df..bb7794fa0ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -69,7 +69,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { */ @Before public void setUp() throws Exception { - htable1.setAutoFlush(true, true); + ((HTable)htable1).setAutoFlush(true, true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for ( JVMClusterUtil.RegionServerThread r : @@ -247,7 +247,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { LOG.info("testSmallBatch"); Put put; // normal Batch tests - htable1.setAutoFlush(false, true); + ((HTable)htable1).setAutoFlush(false, true); for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { put = new Put(Bytes.toBytes(i)); put.add(famName, row, row); @@ -387,7 +387,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testLoading() throws Exception { LOG.info("Writing out rows to table1 in testLoading"); htable1.setWriteBufferSize(1024); - htable1.setAutoFlush(false, true); + ((HTable)htable1).setAutoFlush(false, true); for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) { Put put = new Put(Bytes.toBytes(i)); put.add(famName, row, row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java index c41e97720a5..6fa6cbd7e40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java @@ -86,7 +86,7 @@ public class TestAccessController2 extends SecureTestUtil { public Object run() throws Exception { HTableDescriptor desc = new HTableDescriptor(TEST_TABLE.getTableName()); desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); - Admin admin = new HBaseAdmin(conf); + Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); try { admin.createTable(desc); } finally {