From 2bbcc5a1225cd702a45b787d624714a9e22cd0e2 Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Wed, 1 Aug 2007 20:10:11 +0000 Subject: [PATCH] HADOOP-1528 HClient for multiple tables (phase 1) Modified: HConstants static final Text[] COL_REGIONINFO_ARRAY = new Text [] {COL_REGIONINFO}; static final Text EMPTY_START_ROW = new Text(); HMaster - don't process a region server exit message if the lease has timed out. Otherwise we end up with two pending server shutdown messages to process and chaos ensues. - don't reassign the root region when the server's lease expires. The lease expiration handler will queue a PendingServerShutdown operation that must run before the root region is reassigned because the HLog of the dead server must be split before any regions served by the dead server are reassigned. - added some additional debug level logging HBaseClusterTestCase - call HConnectionManager.deleteConnection(conf) in tearDown() so that multiple tests can be run from the same test class. TestScanner2 - changes to make test compatible with the change from inner class HClient.RegionLocation to public class HRegionLocation Leases - cancelLease just returns if the lease is not found instead of throwing an IOException New: HConnection - an interface that describes the operations performed by a connection implementation HConnectionManager - manages connections for multiple HBase instances and returns an object that implements HConnection from its static method getConnection HBaseAdmin - the HBase administrative methods refactored out of HClient. Each HBaseAdmin object can control a single HBase instance. To manipulate multiple instances, create multiple HBaseAdmin objects. HTable - The data manipulation methods refactored out of HClient. Each HTable object talks to a single table in a single HBase instance. Create multiple HTable objects to use more than one table. HRegionLocation - an inner class refactored out of HClient. Each HRegionLocation has an HRegionInfo object and an HServerAddress object. HClient - totally re-implemented in terms of the new classes above. HClient is now deprecated. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@561935 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../org/apache/hadoop/hbase/HBaseAdmin.java | 481 +++++ src/java/org/apache/hadoop/hbase/HClient.java | 1595 ++--------------- .../org/apache/hadoop/hbase/HConnection.java | 92 + .../hadoop/hbase/HConnectionManager.java | 761 ++++++++ .../org/apache/hadoop/hbase/HConstants.java | 12 +- src/java/org/apache/hadoop/hbase/HMaster.java | 70 +- .../apache/hadoop/hbase/HRegionLocation.java | 94 + src/java/org/apache/hadoop/hbase/HTable.java | 850 +++++++++ src/java/org/apache/hadoop/hbase/Leases.java | 19 +- .../hadoop/hbase/HBaseClusterTestCase.java | 3 + .../hbase/TestCleanRegionServerExit.java | 4 + .../hadoop/hbase/TestRegionServerAbort.java | 3 +- .../org/apache/hadoop/hbase/TestScanner2.java | 8 +- 14 files changed, 2492 insertions(+), 1502 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/HBaseAdmin.java create mode 100644 src/java/org/apache/hadoop/hbase/HConnection.java create mode 100644 src/java/org/apache/hadoop/hbase/HConnectionManager.java create mode 100644 src/java/org/apache/hadoop/hbase/HRegionLocation.java create mode 100644 src/java/org/apache/hadoop/hbase/HTable.java diff --git a/CHANGES.txt b/CHANGES.txt index dbaf2113650..1f80d5bf325 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -79,3 +79,5 @@ Trunk (unreleased changes) 10 concurrent clients 50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches to a single row at a time) + 51. HADOOP-1528 HClient for multiple tables (phase 1) + diff --git a/src/java/org/apache/hadoop/hbase/HBaseAdmin.java b/src/java/org/apache/hadoop/hbase/HBaseAdmin.java new file mode 100644 index 00000000000..05e31df1cee --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/HBaseAdmin.java @@ -0,0 +1,481 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.SortedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.io.KeyedData; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Provides administrative functions for HBase + */ +public class HBaseAdmin implements HConstants { + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + protected final HConnection connection; + protected final long pause; + protected final int numRetries; + protected volatile HMasterInterface master; + + /** + * Constructor + * + * @param conf Configuration object + * @throws MasterNotRunningException + */ + public HBaseAdmin(Configuration conf) throws MasterNotRunningException { + this.connection = HConnectionManager.getConnection(conf); + this.pause = conf.getLong("hbase.client.pause", 30 * 1000); + this.numRetries = conf.getInt("hbase.client.retries.number", 5); + this.master = connection.getMaster(); + } + + /** + * Creates a new table + * + * @param desc table descriptor for table + * + * @throws IllegalArgumentException if the table name is reserved + * @throws MasterNotRunningException if master is not running + * @throws NoServerForRegionException if root region is not being served + * @throws TableExistsException if table already exists (If concurrent + * threads, the table may have been created between test-for-existence + * and attempt-at-creation). + * @throws IOException + */ + public void createTable(HTableDescriptor desc) + throws IOException { + + createTableAsync(desc); + + // Wait for new table to come on-line + connection.getTableServers(desc.getName()); + } + + /** + * Creates a new table but does not block and wait for it to come online. + * + * @param desc table descriptor for table + * + * @throws IllegalArgumentException if the table name is reserved + * @throws MasterNotRunningException if master is not running + * @throws NoServerForRegionException if root region is not being served + * @throws TableExistsException if table already exists (If concurrent + * threads, the table may have been created between test-for-existence + * and attempt-at-creation). + * @throws IOException + */ + public void createTableAsync(HTableDescriptor desc) + throws IOException { + + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(desc.getName()); + try { + this.master.createTable(desc); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + + /** + * Deletes a table + * + * @param tableName name of table to delete + * @throws IOException + */ + public void deleteTable(Text tableName) throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); + + try { + this.master.deleteTable(tableName); + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + // Wait until first region is deleted + HRegionInterface server = + connection.getHRegionConnection(firstMetaServer.getServerAddress()); + DataInputBuffer inbuf = new DataInputBuffer(); + HRegionInfo info = new HRegionInfo(); + for (int tries = 0; tries < numRetries; tries++) { + long scannerId = -1L; + try { + scannerId = + server.openScanner(firstMetaServer.getRegionInfo().getRegionName(), + COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); + KeyedData[] values = server.next(scannerId); + if (values == null || values.length == 0) { + break; + } + boolean found = false; + for (int j = 0; j < values.length; j++) { + if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[j].getData(), values[j].getData().length); + info.readFields(inbuf); + if (info.tableDesc.getName().equals(tableName)) { + found = true; + } + } + } + if (!found) { + break; + } + + } catch (IOException ex) { + if(tries == numRetries - 1) { // no more tries left + if (ex instanceof RemoteException) { + ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); + } + throw ex; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + } catch (Exception ex) { + LOG.warn(ex); + } + } + } + + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + LOG.info("table " + tableName + " deleted"); + } + + /** + * Brings a table on-line (enables it) + * + * @param tableName name of the table + * @throws IOException + */ + public void enableTable(Text tableName) throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); + + try { + this.master.enableTable(tableName); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + // Wait until first region is enabled + + HRegionInterface server = + connection.getHRegionConnection(firstMetaServer.getServerAddress()); + + DataInputBuffer inbuf = new DataInputBuffer(); + HRegionInfo info = new HRegionInfo(); + for (int tries = 0; tries < numRetries; tries++) { + int valuesfound = 0; + long scannerId = -1L; + try { + scannerId = + server.openScanner(firstMetaServer.getRegionInfo().getRegionName(), + COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); + boolean isenabled = false; + + while (true) { + KeyedData[] values = server.next(scannerId); + if (values == null || values.length == 0) { + if (valuesfound == 0) { + throw new NoSuchElementException( + "table " + tableName + " not found"); + } + break; + } + valuesfound += 1; + for (int j = 0; j < values.length; j++) { + if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[j].getData(), values[j].getData().length); + info.readFields(inbuf); + isenabled = !info.offLine; + break; + } + } + if (isenabled) { + break; + } + } + if (isenabled) { + break; + } + + } catch (IOException e) { + if (tries == numRetries - 1) { // no more retries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + + } catch (Exception e) { + LOG.warn(e); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sleep. Waiting for first region to be enabled from " + + tableName); + } + try { + Thread.sleep(pause); + + } catch (InterruptedException e) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Waiting for first region to be enabled from " + + tableName); + } + } + LOG.info("Enabled table " + tableName); + } + + /** + * Disables a table (takes it off-line) If it is being served, the master + * will tell the servers to stop serving it. + * + * @param tableName name of table + * @throws IOException + */ + public void disableTable(Text tableName) throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); + + try { + this.master.disableTable(tableName); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + // Wait until first region is disabled + + HRegionInterface server = + connection.getHRegionConnection(firstMetaServer.getServerAddress()); + + DataInputBuffer inbuf = new DataInputBuffer(); + HRegionInfo info = new HRegionInfo(); + for(int tries = 0; tries < numRetries; tries++) { + int valuesfound = 0; + long scannerId = -1L; + try { + scannerId = + server.openScanner(firstMetaServer.getRegionInfo().getRegionName(), + COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); + + boolean disabled = false; + while (true) { + KeyedData[] values = server.next(scannerId); + if (values == null || values.length == 0) { + if (valuesfound == 0) { + throw new NoSuchElementException("table " + tableName + " not found"); + } + break; + } + valuesfound += 1; + for (int j = 0; j < values.length; j++) { + if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[j].getData(), values[j].getData().length); + info.readFields(inbuf); + disabled = info.offLine; + break; + } + } + if (disabled) { + break; + } + } + if (disabled) { + break; + } + + } catch (IOException e) { + if (tries == numRetries - 1) { // no more retries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + + } catch (Exception e) { + LOG.warn(e); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sleep. Waiting for first region to be disabled from " + + tableName); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Waiting for first region to be disabled from " + + tableName); + } + } + LOG.info("Disabled table " + tableName); + } + + /** + * @param tableName Table to check. + * @return True if table exists already. + * @throws MasterNotRunningException + */ + public boolean tableExists(final Text tableName) throws MasterNotRunningException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + return connection.tableExists(tableName); + } + + /** + * Add a column to an existing table + * + * @param tableName name of the table to add column to + * @param column column descriptor of column to be added + * @throws IOException + */ + public void addColumn(Text tableName, HColumnDescriptor column) + throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + try { + this.master.addColumn(tableName, column); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + + /** + * Delete a column from a table + * + * @param tableName name of table + * @param columnName name of column to be deleted + * @throws IOException + */ + public void deleteColumn(Text tableName, Text columnName) + throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + try { + this.master.deleteColumn(tableName, columnName); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + + /** + * Shuts down the HBase instance + * @throws IOException + */ + public synchronized void shutdown() throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + try { + this.master.shutdown(); + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } finally { + this.master = null; + } + } + + /* + * Verifies that the specified table name is not a reserved name + * @param tableName - the table name to be checked + * @throws IllegalArgumentException - if the table name is reserved + */ + protected void checkReservedTableName(Text tableName) { + if(tableName.equals(ROOT_TABLE_NAME) + || tableName.equals(META_TABLE_NAME)) { + + throw new IllegalArgumentException(tableName + " is a reserved table name"); + } + } + + private HRegionLocation getFirstMetaServerForTable(Text tableName) + throws IOException { + SortedMap metaservers = + connection.getTableServers(META_TABLE_NAME); + + return metaservers.get((metaservers.containsKey(tableName)) ? + tableName : metaservers.headMap(tableName).lastKey()); + } + + +} diff --git a/src/java/org/apache/hadoop/hbase/HClient.java b/src/java/org/apache/hadoop/hbase/HClient.java index 9798c870c8d..27b29e5de54 100644 --- a/src/java/org/apache/hadoop/hbase/HClient.java +++ b/src/java/org/apache/hadoop/hbase/HClient.java @@ -17,500 +17,40 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +// temporary until I change all the classes that depend on HClient. package org.apache.hadoop.hbase; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Random; import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceArray; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.filter.RowFilterInterface; -import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; /** - * HClient manages a connection to a single HRegionServer. + * The HClient class is deprecated and is now implemented entirely in terms of + * the classes that replace it: + * + *

+ * HClient continues to be supported in the short term to give users a chance + * to migrate to the use of HConnection, HTable and HBaseAdmin. Any new API + * features which are added will be added to these three classes and will not + * be supported in HClient. */ +@Deprecated public class HClient implements HConstants { - final Log LOG = LogFactory.getLog(this.getClass().getName()); - - static final Text[] META_COLUMNS = { - COLUMN_FAMILY - }; - - private static final Text[] REGIONINFO = { - COL_REGIONINFO - }; - - static final Text EMPTY_START_ROW = new Text(); - - long pause; - int numRetries; - HMasterInterface master; private final Configuration conf; - private AtomicLong currentLockId; - private Class serverInterfaceClass; - private AtomicReference batch; - - /* - * Data structure that holds current location for a region and its info. - */ - @SuppressWarnings("unchecked") - protected static class RegionLocation implements Comparable { - HRegionInfo regionInfo; - HServerAddress serverAddress; - - RegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) { - this.regionInfo = regionInfo; - this.serverAddress = serverAddress; - } - - /** - * {@inheritDoc} - */ - @Override - public String toString() { - return "address: " + this.serverAddress.toString() + ", regioninfo: " + - this.regionInfo; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean equals(Object o) { - return this.compareTo(o) == 0; - } - - /** - * {@inheritDoc} - */ - @Override - public int hashCode() { - int result = this.regionInfo.hashCode(); - result ^= this.serverAddress.hashCode(); - return result; - } - - /** @return HRegionInfo */ - public HRegionInfo getRegionInfo(){ - return regionInfo; - } - - /** @return HServerAddress */ - public HServerAddress getServerAddress(){ - return serverAddress; - } - - // - // Comparable - // - - /** - * {@inheritDoc} - */ - public int compareTo(Object o) { - RegionLocation other = (RegionLocation) o; - int result = this.regionInfo.compareTo(other.regionInfo); - if(result == 0) { - result = this.serverAddress.compareTo(other.serverAddress); - } - return result; - } - } - - /** encapsulates finding the servers for a table */ - protected class TableServers { - // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) - private TreeMap> tablesToServers; - - /** constructor */ - public TableServers() { - this.tablesToServers = - new TreeMap>(); - } - - /** - * Gets the servers of the given table out of cache, or calls - * findServersForTable if there is nothing in the cache. - * - * @param tableName - the table to be located - * @return map of startRow -> RegionLocation - * @throws IOException - if the table can not be located after retrying - */ - public synchronized SortedMap - getTableServers(Text tableName) throws IOException { - if(tableName == null || tableName.getLength() == 0) { - throw new IllegalArgumentException( - "table name cannot be null or zero length"); - } - SortedMap serverResult = - tablesToServers.get(tableName); - - if (serverResult == null ) { - if (LOG.isDebugEnabled()) { - LOG.debug("No servers for " + tableName + ". Doing a find..."); - } - // We don't know where the table is. - // Load the information from meta. - serverResult = findServersForTable(tableName); - } - return serverResult; - } - - /* - * Clears the cache of all known information about the specified table and - * locates a table by searching the META or ROOT region (as appropriate) or - * by querying the master for the location of the root region if that is the - * table requested. - * - * @param tableName - name of table to find servers for - * @return - map of first row to table info for all regions in the table - * @throws IOException - */ - private SortedMap findServersForTable(Text tableName) - throws IOException { - - // Wipe out everything we know about this table - - if (this.tablesToServers.containsKey(tableName)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Wiping out all we know of " + tableName); - } - this.tablesToServers.remove(tableName); - } - - SortedMap servers = null; - - if (tableName.equals(ROOT_TABLE_NAME)) { - servers = locateRootRegion(); - - } else if (tableName.equals(META_TABLE_NAME)) { - if (tablesToServers.get(ROOT_TABLE_NAME) == null) { - findServersForTable(ROOT_TABLE_NAME); - } - for (int tries = 0; tries < numRetries; tries++) { - try { - servers = loadMetaFromRoot(); - break; - - } catch (IOException e) { - if (tries < numRetries - 1) { - findServersForTable(ROOT_TABLE_NAME); - continue; - } - throw e; - } - } - } else { - for (int tries = 0; tries < numRetries; tries++) { - boolean success = true; // assume this works - - SortedMap metaServers = - this.tablesToServers.get(META_TABLE_NAME); - if (metaServers == null) { - metaServers = findServersForTable(META_TABLE_NAME); - } - Text firstMetaRegion = metaServers.headMap(tableName).lastKey(); - metaServers = metaServers.tailMap(firstMetaRegion); - - servers = new TreeMap(); - for (RegionLocation t: metaServers.values()) { - try { - servers.putAll(scanOneMetaRegion(t, tableName)); - - } catch (IOException e) { - e.printStackTrace(); - if(tries < numRetries - 1) { - findServersForTable(META_TABLE_NAME); - success = false; - break; - } - throw e; - } - } - if (success) { - break; - } - } - } - this.tablesToServers.put(tableName, servers); - if (LOG.isDebugEnabled()) { - if(servers != null) { - for (Map.Entry e: servers.entrySet()) { - LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() + - " for table " + tableName); - } - } - } - return servers; - } - - /* - * Load the meta table from the root table. - * - * @return map of first row to TableInfo for all meta regions - * @throws IOException - */ - private TreeMap loadMetaFromRoot() throws IOException { - SortedMap rootRegion = - this.tablesToServers.get(ROOT_TABLE_NAME); - return scanOneMetaRegion(rootRegion.get(rootRegion.firstKey()), META_TABLE_NAME); - } - - /* - * Repeatedly try to find the root region by asking the master for where it is - * @return TreeMap for root regin if found - * @throws NoServerForRegionException - if the root region can not be located - * after retrying - * @throws IOException - */ - private TreeMap locateRootRegion() throws IOException { - checkMaster(); - - HServerAddress rootRegionLocation = null; - for(int tries = 0; tries < numRetries; tries++) { - int localTimeouts = 0; - while(rootRegionLocation == null && localTimeouts < numRetries) { - rootRegionLocation = master.findRootRegion(); - if(rootRegionLocation == null) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping. Waiting for root region."); - } - Thread.sleep(pause); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding root region."); - } - } catch(InterruptedException iex) { - // continue - } - localTimeouts++; - } - } - - if(rootRegionLocation == null) { - throw new NoServerForRegionException( - "Timed out trying to locate root region"); - } - - HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); - - try { - rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName); - break; - } catch(IOException e) { - if(tries == numRetries - 1) { - // Don't bother sleeping. We've run out of retries. - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - // Sleep and retry finding root region. - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Root region location changed. Sleeping."); - } - Thread.sleep(pause); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding root region."); - } - } catch(InterruptedException iex) { - // continue - } - } - rootRegionLocation = null; - } - - if (rootRegionLocation == null) { - throw new NoServerForRegionException( - "unable to locate root region server"); - } - - TreeMap rootServer = new TreeMap(); - rootServer.put(EMPTY_START_ROW, - new RegionLocation(HGlobals.rootRegionInfo, rootRegionLocation)); - - return rootServer; - } - - /* - * Scans a single meta region - * @param t the meta region we're going to scan - * @param tableName the name of the table we're looking for - * @return returns a map of startingRow to TableInfo - * @throws RegionNotFoundException - if table does not exist - * @throws IllegalStateException - if table is offline - * @throws NoServerForRegionException - if table can not be found after retrying - * @throws IOException - */ - private TreeMap scanOneMetaRegion(final RegionLocation t, - final Text tableName) throws IOException { - HRegionInterface server = getHRegionConnection(t.serverAddress); - TreeMap servers = new TreeMap(); - for(int tries = 0; servers.size() == 0 && tries < numRetries; tries++) { - - long scannerId = -1L; - try { - scannerId = - server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName, - System.currentTimeMillis(), null); - - DataInputBuffer inbuf = new DataInputBuffer(); - while(true) { - HRegionInfo regionInfo = null; - String serverAddress = null; - KeyedData[] values = server.next(scannerId); - if(values.length == 0) { - if(servers.size() == 0) { - // If we didn't find any servers then the table does not exist - throw new TableNotFoundException("table '" + tableName + - "' does not exist in " + t); - } - - // We found at least one server for the table and now we're done. - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + servers.size() + " server(s) for " + - "location: " + t + " for tablename " + tableName); - } - break; - } - - byte[] bytes = null; - TreeMap results = new TreeMap(); - for(int i = 0; i < values.length; i++) { - results.put(values[i].getKey().getColumn(), values[i].getData()); - } - regionInfo = new HRegionInfo(); - bytes = results.get(COL_REGIONINFO); - inbuf.reset(bytes, bytes.length); - regionInfo.readFields(inbuf); - - if(!regionInfo.tableDesc.getName().equals(tableName)) { - // We're done - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + servers.size() + " servers for table " + - tableName); - } - break; - } - - if(regionInfo.offLine) { - throw new IllegalStateException("table offline: " + tableName); - } - - bytes = results.get(COL_SERVER); - if(bytes == null || bytes.length == 0) { - // We need to rescan because the table we want is unassigned. - if(LOG.isDebugEnabled()) { - LOG.debug("no server address for " + regionInfo.toString()); - } - servers.clear(); - break; - } - serverAddress = new String(bytes, UTF8_ENCODING); - servers.put(regionInfo.startKey, - new RegionLocation(regionInfo, new HServerAddress(serverAddress))); - } - } catch (IOException e) { - if(tries == numRetries - 1) { // no retries left - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - } catch(Exception ex) { - LOG.warn(ex); - } - } - } - - if(servers.size() == 0 && tries == numRetries - 1) { - throw new NoServerForRegionException("failed to find server for " - + tableName + " after " + numRetries + " retries"); - } - - if (servers.size() <= 0) { - // The table is not yet being served. Sleep and retry. - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping. Table " + tableName + - " not currently being served."); - } - try { - Thread.sleep(pause); - } catch (InterruptedException ie) { - // continue - } - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding table " + tableName); - } - } - } - return servers; - } - - /** - * Reloads servers for the specified table. - * @param tableName name of table whose servers are to be reloaded - * @return map of start key -> RegionLocation - * @throws IOException - */ - public synchronized SortedMap - reloadTableServers(final Text tableName) - throws IOException { - // Reload information for the whole table - SortedMap servers = findServersForTable(tableName); - - if (LOG.isDebugEnabled()) { - LOG.debug("Result of findTable: " + servers.toString()); - } - - if (tablesToServers.get(tableName) == null) { - throw new TableNotFoundException(tableName.toString()); - } - - return servers; - } - - } - - protected TableServers tableServers; - - // For the "current" table: Map startRow -> RegionLocation - SortedMap currentTableServers; - - // Known region HServerAddress.toString() -> HRegionInterface - private TreeMap servers; - - // For row mutation operations - - Text currentRegion; - HRegionInterface currentServer; - Random rand; - long clientid; - + protected AtomicReference connection; + protected AtomicReference admin; + protected AtomicReference table; /** * Creates a new HClient @@ -518,152 +58,59 @@ public class HClient implements HConstants { */ public HClient(Configuration conf) { this.conf = conf; - this.batch = new AtomicReference(); - this.currentLockId = new AtomicLong(-1L); - - this.pause = conf.getLong("hbase.client.pause", 30 * 1000); - this.numRetries = conf.getInt("hbase.client.retries.number", 5); - - this.master = null; - this.tableServers = new TableServers(); - this.currentTableServers = null; - this.servers = new TreeMap(); - - // For row mutation operations - - this.currentRegion = null; - this.currentServer = null; - this.rand = new Random(); + this.connection = new AtomicReference(); + this.admin = new AtomicReference(); + this.table = new AtomicReference(); } - /* Find the address of the master and connect to it */ - protected void checkMaster() throws MasterNotRunningException { - if (this.master != null) { - return; + /* Lazily creates a HConnection */ + private synchronized HConnection getHConnection() { + HConnection conn = connection.get(); + if (conn == null) { + conn = HConnectionManager.getConnection(conf); + connection.set(conn); } - - for(int tries = 0; this.master == null && tries < numRetries; tries++) { - HServerAddress masterLocation = - new HServerAddress(this.conf.get(MASTER_ADDRESS, - DEFAULT_MASTER_ADDRESS)); - - try { - HMasterInterface tryMaster = - (HMasterInterface)RPC.getProxy(HMasterInterface.class, - HMasterInterface.versionID, masterLocation.getInetSocketAddress(), - this.conf); - if(tryMaster.isMasterRunning()) { - this.master = tryMaster; - break; - } - } catch(IOException e) { - if(tries == numRetries - 1) { - // This was our last chance - don't bother sleeping - break; - } - LOG.info("Attempt " + tries + " of " + this.numRetries + - " failed with <" + e + ">. Retrying after sleep of " + this.pause); - } - - // We either cannot connect to master or it is not running. Sleep & retry - try { - Thread.sleep(this.pause); - } catch(InterruptedException e) { - // continue - } - } - - if(this.master == null) { - throw new MasterNotRunningException(); - } - } - - /** - * @return - true if the master server is running - */ - public boolean isMasterRunning() { - if(this.master == null) { - try { - checkMaster(); - - } catch(MasterNotRunningException e) { - return false; - } - } - return true; - } - - /** - * Reloads the cached server information for the current table - * - * @param info RegionInfo for a region that is a part of the table - * @throws IOException - */ - protected synchronized void reloadCurrentTable(RegionLocation info) - throws IOException { - this.currentTableServers = tableServers.reloadTableServers( - info.getRegionInfo().getTableDesc().getName()); + return conn; } + /* Lazily creates a HBaseAdmin */ + private synchronized HBaseAdmin getHBaseAdmin() throws MasterNotRunningException { + getHConnection(); // ensure we have a connection + HBaseAdmin adm = admin.get(); + if (adm == null) { + adm = new HBaseAdmin(conf); + admin.set(adm); + } + return adm; + } + /** * Find region location hosting passed row using cached info * @param row Row to find. * @return Location of row. */ - protected synchronized RegionLocation getRegionLocation(Text row) { - if(this.currentTableServers == null) { + protected HRegionLocation getRegionLocation(Text row) { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - - // Only one server will have the row we are looking for - Text serverKey = (this.currentTableServers.containsKey(row))? row: - this.currentTableServers.headMap(row).lastKey(); - return this.currentTableServers.get(serverKey); + return table.get().getRegionLocation(row); } - + /** * Establishes a connection to the region server at the specified address. * @param regionServer - the server to connect to * @throws IOException */ - protected synchronized HRegionInterface getHRegionConnection ( + protected HRegionInterface getHRegionConnection( HServerAddress regionServer) throws IOException { - - getRegionServerInterface(); - - // See if we already have a connection - HRegionInterface server = this.servers.get(regionServer.toString()); - - if (server == null) { // Get a connection - long versionId = 0; - try { - versionId = - serverInterfaceClass.getDeclaredField("versionID").getLong(server); - - } catch (IllegalAccessException e) { - // Should never happen unless visibility of versionID changes - throw new UnsupportedOperationException( - "Unable to open a connection to a " + serverInterfaceClass.getName() - + " server.", e); - - } catch (NoSuchFieldException e) { - // Should never happen unless versionID field name changes in HRegionInterface - throw new UnsupportedOperationException( - "Unable to open a connection to a " + serverInterfaceClass.getName() - + " server.", e); - } - - try { - server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass, - versionId, regionServer.getInetSocketAddress(), this.conf); - - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - this.servers.put(regionServer.toString(), server); - } - return server; + return getHConnection().getHRegionConnection(regionServer); + } + + /** + * @return - true if the master server is running + */ + public boolean isMasterRunning() { + return getHConnection().isMasterRunning(); } // @@ -683,12 +130,10 @@ public class HClient implements HConstants { * and attempt-at-creation). * @throws IOException */ - public synchronized void createTable(HTableDescriptor desc) + public void createTable(HTableDescriptor desc) throws IOException { - createTableAsync(desc); - - // Wait for new table to come on-line - tableServers.getTableServers(desc.getName()); + + getHBaseAdmin().createTable(desc); } /** @@ -704,15 +149,10 @@ public class HClient implements HConstants { * and attempt-at-creation). * @throws IOException */ - public synchronized void createTableAsync(HTableDescriptor desc) + public void createTableAsync(HTableDescriptor desc) throws IOException { - checkReservedTableName(desc.getName()); - checkMaster(); - try { - this.master.createTable(desc); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + + getHBaseAdmin().createTableAsync(desc); } /** @@ -721,70 +161,8 @@ public class HClient implements HConstants { * @param tableName name of table to delete * @throws IOException */ - public synchronized void deleteTable(Text tableName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); - - try { - this.master.deleteTable(tableName); - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - // Wait until first region is deleted - HRegionInterface server = - getHRegionConnection(firstMetaServer.serverAddress); - DataInputBuffer inbuf = new DataInputBuffer(); - HRegionInfo info = new HRegionInfo(); - for (int tries = 0; tries < numRetries; tries++) { - long scannerId = -1L; - try { - scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName, System.currentTimeMillis(), null); - KeyedData[] values = server.next(scannerId); - if(values == null || values.length == 0) { - break; - } - boolean found = false; - for(int j = 0; j < values.length; j++) { - if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); - if(info.tableDesc.getName().equals(tableName)) { - found = true; - } - } - } - if(!found) { - break; - } - - } catch (IOException ex) { - if(tries == numRetries - 1) { // no more tries left - if(ex instanceof RemoteException) { - ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); - } - throw ex; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - } catch(Exception ex) { - LOG.warn(ex); - } - } - } - - try { - Thread.sleep(pause); - } catch(InterruptedException e) { - // continue - } - } - LOG.info("table " + tableName + " deleted"); + public void deleteTable(Text tableName) throws IOException { + getHBaseAdmin().deleteTable(tableName); } /** @@ -793,88 +171,8 @@ public class HClient implements HConstants { * @param tableName name of the table * @throws IOException */ - public synchronized void enableTable(Text tableName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); - - try { - this.master.enableTable(tableName); - - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - // Wait until first region is enabled - - HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); - - DataInputBuffer inbuf = new DataInputBuffer(); - HRegionInfo info = new HRegionInfo(); - for(int tries = 0; tries < numRetries; tries++) { - int valuesfound = 0; - long scannerId = -1L; - try { - scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName, System.currentTimeMillis(), null); - boolean isenabled = false; - while(true) { - KeyedData[] values = server.next(scannerId); - if(values == null || values.length == 0) { - if(valuesfound == 0) { - throw new NoSuchElementException("table " + tableName + " not found"); - } - break; - } - valuesfound += 1; - for(int j = 0; j < values.length; j++) { - if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); - isenabled = !info.offLine; - break; - } - } - if(isenabled) { - break; - } - } - if(isenabled) { - break; - } - - } catch (IOException e) { - if(tries == numRetries - 1) { // no more retries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - - } catch(Exception e) { - LOG.warn(e); - } - } - } - if(LOG.isDebugEnabled()) { - LOG.debug("Sleep. Waiting for first region to be enabled from " + tableName); - } - try { - Thread.sleep(pause); - - } catch(InterruptedException e) { - // continue - } - if(LOG.isDebugEnabled()) { - LOG.debug("Wake. Waiting for first region to be enabled from " + tableName); - } - } - LOG.info("Enabled table " + tableName); + public void enableTable(Text tableName) throws IOException { + getHBaseAdmin().enableTable(tableName); } /** @@ -884,104 +182,17 @@ public class HClient implements HConstants { * @param tableName name of table * @throws IOException */ - public synchronized void disableTable(Text tableName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); - - try { - this.master.disableTable(tableName); - - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - // Wait until first region is disabled - - HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); - - DataInputBuffer inbuf = new DataInputBuffer(); - HRegionInfo info = new HRegionInfo(); - for(int tries = 0; tries < numRetries; tries++) { - int valuesfound = 0; - long scannerId = -1L; - try { - scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName, System.currentTimeMillis(), null); - boolean disabled = false; - while(true) { - KeyedData[] values = server.next(scannerId); - if(values == null || values.length == 0) { - if(valuesfound == 0) { - throw new NoSuchElementException("table " + tableName + " not found"); - } - break; - } - valuesfound += 1; - for(int j = 0; j < values.length; j++) { - if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); - disabled = info.offLine; - break; - } - } - if(disabled) { - break; - } - } - if(disabled) { - break; - } - - } catch(IOException e) { - if(tries == numRetries - 1) { // no more retries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - - } catch(Exception e) { - LOG.warn(e); - } - } - } - if(LOG.isDebugEnabled()) { - LOG.debug("Sleep. Waiting for first region to be disabled from " + tableName); - } - try { - Thread.sleep(pause); - } catch(InterruptedException e) { - // continue - } - if(LOG.isDebugEnabled()) { - LOG.debug("Wake. Waiting for first region to be disabled from " + tableName); - } - } - LOG.info("Disabled table " + tableName); + public void disableTable(Text tableName) throws IOException { + getHBaseAdmin().disableTable(tableName); } /** * @param tableName Table to check. * @return True if table exists already. - * @throws IOException + * @throws MasterNotRunningException */ - public boolean tableExists(final Text tableName) throws IOException { - HTableDescriptor [] tables = listTables(); - boolean result = false; - for (int i = 0; i < tables.length; i++) { - if (tables[i].getName().equals(tableName)) { - result = true; - break; - } - } - return result; + public boolean tableExists(final Text tableName) throws MasterNotRunningException { + return getHBaseAdmin().tableExists(tableName); } /** @@ -991,16 +202,9 @@ public class HClient implements HConstants { * @param column column descriptor of column to be added * @throws IOException */ - public synchronized void addColumn(Text tableName, HColumnDescriptor column) + public void addColumn(Text tableName, HColumnDescriptor column) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - try { - this.master.addColumn(tableName, column); - - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + getHBaseAdmin().addColumn(tableName, column); } /** @@ -1010,53 +214,19 @@ public class HClient implements HConstants { * @param columnName name of column to be deleted * @throws IOException */ - public synchronized void deleteColumn(Text tableName, Text columnName) + public void deleteColumn(Text tableName, Text columnName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - try { - this.master.deleteColumn(tableName, columnName); - - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + getHBaseAdmin().deleteColumn(tableName, columnName); } /** * Shuts down the HBase instance * @throws IOException */ - public synchronized void shutdown() throws IOException { - checkMaster(); - try { - this.master.shutdown(); - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + public void shutdown() throws IOException { + getHBaseAdmin().shutdown(); } - /* - * Verifies that the specified table name is not a reserved name - * @param tableName - the table name to be checked - * @throws IllegalArgumentException - if the table name is reserved - */ - protected void checkReservedTableName(Text tableName) { - if(tableName.equals(ROOT_TABLE_NAME) - || tableName.equals(META_TABLE_NAME)) { - - throw new IllegalArgumentException(tableName + " is a reserved table name"); - } - } - - private RegionLocation getFirstMetaServerForTable(Text tableName) - throws IOException { - SortedMap metaservers = - tableServers.getTableServers(META_TABLE_NAME); - - return metaservers.get((metaservers.containsKey(tableName)) ? - tableName : metaservers.headMap(tableName).lastKey()); - } - ////////////////////////////////////////////////////////////////////////////// // Client API ////////////////////////////////////////////////////////////////////////////// @@ -1068,30 +238,22 @@ public class HClient implements HConstants { * @throws IOException if the table can not be located after retrying */ public synchronized void openTable(Text tableName) throws IOException { - if(tableName == null || tableName.getLength() == 0) { - throw new IllegalArgumentException("table name cannot be null or zero length"); + HTable table = this.table.get(); + if (table != null) { + table.checkUpdateInProgress(); } - if(this.currentLockId.get() != -1L || batch.get() != null) { - throw new IllegalStateException("update in progress"); - } - this.currentTableServers = tableServers.getTableServers(tableName); + this.table.set(new HTable(conf, tableName)); } /** * Gets the starting row key for every region in the currently open table * @return Array of region starting row keys */ - public synchronized Text[] getStartKeys() { - if(this.currentTableServers == null) { + public Text[] getStartKeys() { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - - Text[] keys = new Text[currentTableServers.size()]; - int i = 0; - for(Text key: currentTableServers.keySet()){ - keys[i++] = key; - } - return keys; + return table.get().getStartKeys(); } /** @@ -1104,49 +266,8 @@ public class HClient implements HConstants { * @return - returns an array of HTableDescriptors * @throws IOException */ - public synchronized HTableDescriptor[] listTables() - throws IOException { - TreeSet uniqueTables = new TreeSet(); - - SortedMap metaTables = - tableServers.getTableServers(META_TABLE_NAME); - - for (RegionLocation t: metaTables.values()) { - HRegionInterface server = getHRegionConnection(t.serverAddress); - long scannerId = -1L; - try { - scannerId = server.openScanner(t.regionInfo.regionName, - META_COLUMNS, EMPTY_START_ROW, System.currentTimeMillis(), null); - - DataInputBuffer inbuf = new DataInputBuffer(); - while(true) { - KeyedData[] values = server.next(scannerId); - if(values.length == 0) { - break; - } - for(int i = 0; i < values.length; i++) { - if(values[i].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[i].getData(), values[i].getData().length); - HRegionInfo info = new HRegionInfo(); - info.readFields(inbuf); - - // Only examine the rows where the startKey is zero length - if(info.startKey.getLength() == 0) { - uniqueTables.add(info.tableDesc); - } - } - } - } - } catch (RemoteException ex) { - throw RemoteExceptionHandler.decodeRemoteException(ex); - - } finally { - if(scannerId != -1L) { - server.close(scannerId); - } - } - } - return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); + public HTableDescriptor[] listTables() throws IOException { + return getHConnection().listTables(); } /** @@ -1158,31 +279,10 @@ public class HClient implements HConstants { * @throws IOException */ public byte[] get(Text row, Text column) throws IOException { - byte [] value = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - try { - value = server.get(info.regionInfo.regionName, row, column); - break; - - } catch (IOException e) { - if (tries == numRetries - 1) { - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - return value; + return this.table.get().get(row, column); } /** @@ -1194,42 +294,12 @@ public class HClient implements HConstants { * @return - array byte values * @throws IOException */ - public byte[][] get(Text row, Text column, int numVersions) throws IOException { - byte [][] values = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - - try { - values = server.get(info.regionInfo.regionName, row, column, numVersions); - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } + public byte[][] get(Text row, Text column, int numVersions) + throws IOException { + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - - if(values != null) { - ArrayList bytes = new ArrayList(); - for(int i = 0 ; i < values.length; i++) { - bytes.add(values[i]); - } - return bytes.toArray(new byte[values.length][]); - } - return null; + return this.table.get().get(row, column, numVersions); } /** @@ -1245,40 +315,10 @@ public class HClient implements HConstants { */ public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException { - byte [][] values = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - try { - values = server.get(info.regionInfo.regionName, row, column, timestamp, numVersions); - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - - if(values != null) { - ArrayList bytes = new ArrayList(); - for(int i = 0 ; i < values.length; i++) { - bytes.add(values[i]); - } - return bytes.toArray(new byte[values.length][]); - } - return null; + return this.table.get().get(row, column, timestamp, numVersions); } /** @@ -1289,39 +329,10 @@ public class HClient implements HConstants { * @throws IOException */ public SortedMap getRow(Text row) throws IOException { - KeyedData[] value = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - - try { - value = server.getRow(info.regionInfo.regionName, row); - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - TreeMap results = new TreeMap(); - if(value != null && value.length != 0) { - for(int i = 0; i < value.length; i++) { - results.put(value[i].getKey().getColumn(), value[i].getData()); - } - } - return results; + return this.table.get().getRow(row); } /** @@ -1333,7 +344,7 @@ public class HClient implements HConstants { * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException { return obtainScanner(columns, startRow, System.currentTimeMillis(), null); } @@ -1348,7 +359,7 @@ public class HClient implements HConstants { * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow, long timestamp) throws IOException { return obtainScanner(columns, startRow, timestamp, null); } @@ -1363,7 +374,7 @@ public class HClient implements HConstants { * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow, RowFilterInterface filter) throws IOException { return obtainScanner(columns, startRow, System.currentTimeMillis(), filter); } @@ -1379,13 +390,13 @@ public class HClient implements HConstants { * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow, long timestamp, RowFilterInterface filter) throws IOException { - if(this.currentTableServers == null) { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - return new ClientScanner(columns, startRow, timestamp, filter); + return this.table.get().obtainScanner(columns, startRow, timestamp, filter); } /** @@ -1397,30 +408,22 @@ public class HClient implements HConstants { * @param row name of row to be updated * @return lockid to be used in subsequent put, delete and commit calls */ - public synchronized long startBatchUpdate(final Text row) { - if (this.currentTableServers == null) { + public long startBatchUpdate(final Text row) { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - if (batch.get() != null) { - throw new IllegalStateException("batch update in progress"); - } - batch.set(new BatchUpdate()); - return batch.get().startUpdate(row); + return this.table.get().startBatchUpdate(row); } /** * Abort a batch mutation * @param lockid lock id returned by startBatchUpdate */ - public synchronized void abortBatch(final long lockid) { - BatchUpdate u = batch.get(); - if (u == null) { - throw new IllegalStateException("no batch update in progress"); + public void abortBatch(final long lockid) { + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - if (u.getLockid() != lockid) { - throw new IllegalArgumentException("invalid lock id " + lockid); - } - batch.set(null); + this.table.get().abortBatch(lockid); } /** @@ -1440,44 +443,12 @@ public class HClient implements HConstants { * @param timestamp time to associate with all the changes * @throws IOException */ - public synchronized void commitBatch(final long lockid, final long timestamp) + public void commitBatch(final long lockid, final long timestamp) throws IOException { - BatchUpdate u = batch.get(); - if (u == null) { - throw new IllegalStateException("no batch update in progress"); - } - if (u.getLockid() != lockid) { - throw new IllegalArgumentException("invalid lock id " + lockid); - } - - try { - for (int tries = 0; tries < numRetries; tries++) { - RegionLocation r = getRegionLocation(u.getRow()); - HRegionInterface server = getHRegionConnection(r.serverAddress); - try { - server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, u); - break; - - } catch (IOException e) { - if (tries < numRetries -1) { - reloadCurrentTable(r); - - } else { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } - try { - Thread.sleep(pause); - - } catch (InterruptedException e) { - } - } - } finally { - batch.set(null); + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().commitBatch(lockid, timestamp); } /** @@ -1495,47 +466,11 @@ public class HClient implements HConstants { * @return Row lockid. * @throws IOException */ - public synchronized long startUpdate(final Text row) throws IOException { - if (this.currentLockId.get() != -1L) { - throw new IllegalStateException("update in progress"); + public long startUpdate(final Text row) throws IOException { + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - if (batch.get() != null) { - throw new IllegalStateException("batch update in progress"); - } - for (int tries = 0; tries < numRetries; tries++) { - IOException e = null; - RegionLocation info = getRegionLocation(row); - try { - currentServer = getHRegionConnection(info.serverAddress); - currentRegion = info.regionInfo.regionName; - clientid = rand.nextLong(); - this.currentLockId.set( - currentServer.startUpdate(currentRegion, clientid, row)); - break; - - } catch (IOException ex) { - e = ex; - } - if (tries < numRetries - 1) { - try { - Thread.sleep(this.pause); - - } catch (InterruptedException ex) { - } - try { - reloadCurrentTable(info); - - } catch (IOException ex) { - e = ex; - } - } else { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } - return this.currentLockId.get(); + return this.table.get().startUpdate(row); } /** @@ -1548,33 +483,10 @@ public class HClient implements HConstants { * @throws IOException */ public void put(long lockid, Text column, byte val[]) throws IOException { - if (val == null) { - throw new IllegalArgumentException("value cannot be null"); - } - if (batch.get() != null) { - batch.get().put(lockid, column, val); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.put(this.currentRegion, this.clientid, lockid, column, - val); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().put(lockid, column, val); } /** @@ -1585,30 +497,10 @@ public class HClient implements HConstants { * @throws IOException */ public void delete(long lockid, Text column) throws IOException { - if (batch.get() != null) { - batch.get().delete(lockid, column); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.delete(this.currentRegion, this.clientid, lockid, - column); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().delete(lockid, column); } /** @@ -1618,26 +510,10 @@ public class HClient implements HConstants { * @throws IOException */ public void abort(long lockid) throws IOException { - if (batch.get() != null) { - abortBatch(lockid); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e) { - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } finally { - this.currentLockId.set(-1L); + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().abort(lockid); } /** @@ -1658,28 +534,10 @@ public class HClient implements HConstants { * @throws IOException */ public void commit(long lockid, long timestamp) throws IOException { - if (batch.get() != null) { - commitBatch(lockid, timestamp); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.commit(this.currentRegion, this.clientid, lockid, - timestamp); - - } catch (IOException e) { - this.currentServer = null; - this.currentRegion = null; - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } finally { - this.currentLockId.set(-1L); + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().commit(lockid, timestamp); } /** @@ -1689,175 +547,12 @@ public class HClient implements HConstants { * @throws IOException */ public void renewLease(long lockid) throws IOException { - if (batch.get() != null) { - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.renewLease(lockid, this.clientid); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().renewLease(lockid); } - /** - * Implements the scanner interface for the HBase client. - * If there are multiple regions in a table, this scanner will iterate - * through them all. - */ - private class ClientScanner implements HScannerInterface { - private final Text EMPTY_COLUMN = new Text(); - private Text[] columns; - private Text startRow; - private long scanTime; - private boolean closed; - private AtomicReferenceArray regions; - @SuppressWarnings("hiding") - private int currentRegion; - private HRegionInterface server; - private long scannerId; - private RowFilterInterface filter; - - private void loadRegions() { - Text firstServer = null; - if(this.startRow == null || this.startRow.getLength() == 0) { - firstServer = currentTableServers.firstKey(); - - } else if(currentTableServers.containsKey(startRow)) { - firstServer = startRow; - - } else { - firstServer = currentTableServers.headMap(startRow).lastKey(); - } - Collection info = - currentTableServers.tailMap(firstServer).values(); - - this.regions = new AtomicReferenceArray( - info.toArray(new RegionLocation[info.size()])); - } - - ClientScanner(Text[] columns, Text startRow, long timestamp, - RowFilterInterface filter) throws IOException { - this.columns = columns; - this.startRow = startRow; - this.scanTime = timestamp; - this.closed = false; - this.filter = filter; - if (filter != null) { - filter.validate(columns); - } - loadRegions(); - this.currentRegion = -1; - this.server = null; - this.scannerId = -1L; - nextScanner(); - } - - /* - * Gets a scanner for the next region. - * Returns false if there are no more scanners. - */ - private boolean nextScanner() throws IOException { - if(this.scannerId != -1L) { - this.server.close(this.scannerId); - this.scannerId = -1L; - } - this.currentRegion += 1; - if(this.currentRegion == this.regions.length()) { - close(); - return false; - } - try { - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = this.regions.get(currentRegion); - this.server = getHRegionConnection(info.serverAddress); - - try { - if (this.filter == null) { - this.scannerId = this.server.openScanner(info.regionInfo.regionName, - this.columns, currentRegion == 0 ? this.startRow - : EMPTY_START_ROW, scanTime, null); - } else { - this.scannerId = - this.server.openScanner(info.regionInfo.regionName, - this.columns, currentRegion == 0 ? this.startRow - : EMPTY_START_ROW, scanTime, filter); - } - - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - loadRegions(); - } - } - - } catch(IOException e) { - close(); - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - return true; - } - - /** - * {@inheritDoc} - */ - public boolean next(HStoreKey key, TreeMap results) throws IOException { - if(this.closed) { - return false; - } - KeyedData[] values = null; - do { - values = this.server.next(this.scannerId); - } while(values != null && values.length == 0 && nextScanner()); - - if(values != null && values.length != 0) { - for(int i = 0; i < values.length; i++) { - key.setRow(values[i].getKey().getRow()); - key.setVersion(values[i].getKey().getTimestamp()); - key.setColumn(EMPTY_COLUMN); - results.put(values[i].getKey().getColumn(), values[i].getData()); - } - } - return values == null ? false : values.length != 0; - } - - /** - * {@inheritDoc} - */ - public void close() throws IOException { - if(this.scannerId != -1L) { - this.server.close(this.scannerId); - this.scannerId = -1L; - } - this.server = null; - this.closed = true; - } - } - private void printUsage() { printUsage(null); } @@ -1993,28 +688,6 @@ public class HClient implements HConstants { return errCode; } - /** - * Determine the region server interface to use from configuration properties. - * - */ - @SuppressWarnings("unchecked") - private void getRegionServerInterface() { - if (this.serverInterfaceClass != null) { - return; - } - - String serverClassName = this.conf.get(REGION_SERVER_CLASS, - DEFAULT_REGION_SERVER_CLASS); - - try { - this.serverInterfaceClass = - (Class) Class.forName(serverClassName); - } catch (ClassNotFoundException e) { - throw new UnsupportedOperationException( - "Unable to find region server interface " + serverClassName, e); - } - } - /** * @return the configuration for this client */ diff --git a/src/java/org/apache/hadoop/hbase/HConnection.java b/src/java/org/apache/hadoop/hbase/HConnection.java new file mode 100644 index 00000000000..714c2f1f2d3 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/HConnection.java @@ -0,0 +1,92 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.SortedMap; + +import org.apache.hadoop.io.Text; + +/** + * + */ +public interface HConnection { + /** + * @return proxy connection to master server for this instance + * @throws MasterNotRunningException + */ + public HMasterInterface getMaster() throws MasterNotRunningException; + + /** @return - true if the master server is running */ + public boolean isMasterRunning(); + + /** + * @param tableName Table to check. + * @return True if table exists already. + */ + public boolean tableExists(final Text tableName); + + /** + * List all the userspace tables. In other words, scan the META table. + * + * If we wanted this to be really fast, we could implement a special + * catalog table that just contains table names and their descriptors. + * Right now, it only exists as part of the META table's region info. + * + * @return - returns an array of HTableDescriptors + * @throws IOException + */ + public HTableDescriptor[] listTables() throws IOException; + + /** + * Gets the servers of the given table. + * + * @param tableName - the table to be located + * @return map of startRow -> RegionLocation + * @throws IOException - if the table can not be located after retrying + */ + public SortedMap getTableServers(Text tableName) + throws IOException; + + /** + * Reloads servers for the specified table. + * + * @param tableName name of table whose servers are to be reloaded + * @return map of start key -> RegionLocation + * @throws IOException + */ + public SortedMap + reloadTableServers(final Text tableName) throws IOException; + + /** + * Establishes a connection to the region server at the specified address. + * @param regionServer - the server to connect to + * @return proxy for HRegionServer + * @throws IOException + */ + public HRegionInterface getHRegionConnection(HServerAddress regionServer) + throws IOException; + + /** + * Discard all the information about this table + * @param tableName the name of the table to close + */ + public void close(Text tableName); +} diff --git a/src/java/org/apache/hadoop/hbase/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/HConnectionManager.java new file mode 100644 index 00000000000..669a07df1b1 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/HConnectionManager.java @@ -0,0 +1,761 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.hbase.io.KeyedData; + +/** + * A non-instantiable class that manages connections to multiple tables in + * multiple HBase instances + */ +public class HConnectionManager implements HConstants { + private HConnectionManager(){} // Not instantiable + + // A Map of master HServerAddress -> connection information for that instance + // Note that although the Map is synchronized, the objects it contains + // are mutable and hence require synchronized access to them + + private static final Map HBASE_INSTANCES = + Collections.synchronizedMap(new HashMap()); + + /** + * Get the connection object for the instance specified by the configuration + * If no current connection exists, create a new connection for that instance + * @param conf + * @return HConnection object for the instance specified by the configuration + */ + public static HConnection getConnection(Configuration conf) { + HConnection connection; + synchronized (HBASE_INSTANCES) { + String instanceName = conf.get(HBASE_DIR, DEFAULT_HBASE_DIR); + + connection = HBASE_INSTANCES.get(instanceName); + + if (connection == null) { + connection = new TableServers(conf); + HBASE_INSTANCES.put(instanceName, connection); + } + } + return connection; + } + + /** + * Delete connection information for the instance specified by the configuration + * @param conf + */ + public static void deleteConnection(Configuration conf) { + synchronized (HBASE_INSTANCES) { + HBASE_INSTANCES.remove(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); + } + } + + /* encapsulates finding the servers for an HBase instance */ + private static class TableServers implements HConnection, HConstants { + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private final Class serverInterfaceClass; + private final long threadWakeFrequency; + private final long pause; + private final int numRetries; + + private final Integer masterLock = new Integer(0); + private volatile HMasterInterface master; + private volatile boolean masterChecked; + + private final Integer rootRegionLock = new Integer(0); + private final Integer metaRegionLock = new Integer(0); + + private volatile Configuration conf; + + // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) + private Map> tablesToServers; + + // Set of closed tables + private Set closedTables; + + // Set of tables currently being located + private HashSet tablesBeingLocated; + + // Known region HServerAddress.toString() -> HRegionInterface + private HashMap servers; + + /** constructor + * @param conf Configuration object + */ + @SuppressWarnings("unchecked") + public TableServers(Configuration conf) { + this.conf = conf; + + String serverClassName = + conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS); + + try { + this.serverInterfaceClass = + (Class) Class.forName(serverClassName); + + } catch (ClassNotFoundException e) { + throw new UnsupportedOperationException( + "Unable to find region server interface " + serverClassName, e); + } + + this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); + this.pause = conf.getLong("hbase.client.pause", 30 * 1000); + this.numRetries = conf.getInt("hbase.client.retries.number", 5); + + this.master = null; + this.masterChecked = false; + + this.tablesToServers = Collections.synchronizedMap( + new HashMap>()); + + this.closedTables = Collections.synchronizedSet(new HashSet()); + this.tablesBeingLocated = new HashSet(); + + this.servers = new HashMap(); + } + + /** {@inheritDoc} */ + public HMasterInterface getMaster() throws MasterNotRunningException { + synchronized (this.masterLock) { + for (int tries = 0; + !this.masterChecked && this.master == null && tries < numRetries; + tries++) { + + HServerAddress masterLocation = new HServerAddress(this.conf.get( + MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)); + + try { + HMasterInterface tryMaster = (HMasterInterface)RPC.getProxy( + HMasterInterface.class, HMasterInterface.versionID, + masterLocation.getInetSocketAddress(), this.conf); + + if (tryMaster.isMasterRunning()) { + this.master = tryMaster; + break; + } + + } catch (IOException e) { + if(tries == numRetries - 1) { + // This was our last chance - don't bother sleeping + break; + } + LOG.info("Attempt " + tries + " of " + this.numRetries + + " failed with <" + e + ">. Retrying after sleep of " + this.pause); + } + + // We either cannot connect to master or it is not running. Sleep & retry + + try { + Thread.sleep(this.pause); + } catch (InterruptedException e) { + // continue + } + } + this.masterChecked = true; + } + if (this.master == null) { + throw new MasterNotRunningException(); + } + return this.master; + } + + /** {@inheritDoc} */ + public boolean isMasterRunning() { + if (this.master == null) { + try { + getMaster(); + + } catch (MasterNotRunningException e) { + return false; + } + } + return true; + } + + /** {@inheritDoc} */ + public boolean tableExists(final Text tableName) { + boolean exists = true; + try { + SortedMap servers = getTableServers(tableName); + if (servers == null || servers.size() == 0) { + exists = false; + } + + } catch (IOException e) { + exists = false; + } + return exists; + } + + /** {@inheritDoc} */ + public HTableDescriptor[] listTables() throws IOException { + TreeSet uniqueTables = new TreeSet(); + + SortedMap metaTables = + getTableServers(META_TABLE_NAME); + + for (HRegionLocation t: metaTables.values()) { + HRegionInterface server = getHRegionConnection(t.getServerAddress()); + long scannerId = -1L; + try { + scannerId = server.openScanner(t.getRegionInfo().getRegionName(), + COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(), + null); + + DataInputBuffer inbuf = new DataInputBuffer(); + while (true) { + KeyedData[] values = server.next(scannerId); + if (values.length == 0) { + break; + } + for (int i = 0; i < values.length; i++) { + if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[i].getData(), values[i].getData().length); + HRegionInfo info = new HRegionInfo(); + info.readFields(inbuf); + + // Only examine the rows where the startKey is zero length + if (info.startKey.getLength() == 0) { + uniqueTables.add(info.tableDesc); + } + } + } + } + } catch (RemoteException ex) { + throw RemoteExceptionHandler.decodeRemoteException(ex); + + } finally { + if (scannerId != -1L) { + server.close(scannerId); + } + } + } + return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); + } + + /** {@inheritDoc} */ + public SortedMap + getTableServers(Text tableName) throws IOException { + + if (tableName == null || tableName.getLength() == 0) { + throw new IllegalArgumentException( + "table name cannot be null or zero length"); + } + + if (closedTables.contains(tableName)) { + throw new IllegalStateException("table closed: " + tableName); + } + + SortedMap tableServers = + tablesToServers.get(tableName); + + if (tableServers == null ) { + if (LOG.isDebugEnabled()) { + LOG.debug("No servers for " + tableName + ". Doing a find..."); + } + // We don't know where the table is. + // Load the information from meta. + tableServers = findServersForTable(tableName); + } + SortedMap servers = + new TreeMap(); + + servers.putAll(tableServers); + return servers; + } + + /** {@inheritDoc} */ + public SortedMap + reloadTableServers(final Text tableName) throws IOException { + + if (closedTables.contains(tableName)) { + throw new IllegalStateException("table closed: " + tableName); + } + + SortedMap servers = + new TreeMap(); + + // Reload information for the whole table + + servers.putAll(findServersForTable(tableName)); + if (LOG.isDebugEnabled()) { + LOG.debug("Result of findTable: " + servers.toString()); + } + + return servers; + } + + /** {@inheritDoc} */ + public HRegionInterface getHRegionConnection( + HServerAddress regionServer) throws IOException { + + HRegionInterface server; + synchronized (this.servers) { + // See if we already have a connection + server = this.servers.get(regionServer.toString()); + + if (server == null) { // Get a connection + long versionId = 0; + try { + versionId = + serverInterfaceClass.getDeclaredField("versionID").getLong(server); + + } catch (IllegalAccessException e) { + // Should never happen unless visibility of versionID changes + throw new UnsupportedOperationException( + "Unable to open a connection to a " + + serverInterfaceClass.getName() + " server.", e); + + } catch (NoSuchFieldException e) { + // Should never happen unless versionID field name changes in HRegionInterface + throw new UnsupportedOperationException( + "Unable to open a connection to a " + + serverInterfaceClass.getName() + " server.", e); + } + + try { + server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass, + versionId, regionServer.getInetSocketAddress(), this.conf); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + this.servers.put(regionServer.toString(), server); + } + } + return server; + } + + /** {@inheritDoc} */ + public void close(Text tableName) { + if (tableName == null || tableName.getLength() == 0) { + throw new IllegalArgumentException( + "table name cannot be null or zero length"); + } + + if (closedTables.contains(tableName)) { + throw new IllegalStateException("table closed: " + tableName); + } + + SortedMap tableServers = + tablesToServers.remove(tableName); + + if (tableServers == null) { + throw new IllegalArgumentException("table was not opened: " + tableName); + } + + closedTables.add(tableName); + + // Shut down connections to the HRegionServers + + synchronized (this.servers) { + for (HRegionLocation r: tableServers.values()) { + this.servers.remove(r.getServerAddress().toString()); + } + } + } + + /* + * Clears the cache of all known information about the specified table and + * locates a table by searching the META or ROOT region (as appropriate) or + * by querying the master for the location of the root region if that is the + * table requested. + * + * @param tableName - name of table to find servers for + * @return - map of first row to table info for all regions in the table + * @throws IOException + */ + private SortedMap findServersForTable(Text tableName) + throws IOException { + + // Wipe out everything we know about this table + + if (this.tablesToServers.remove(tableName) != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Wiping out all we know of " + tableName); + } + } + + SortedMap servers = + new TreeMap(); + + if (tableName.equals(ROOT_TABLE_NAME)) { + synchronized (rootRegionLock) { + // This block guards against two threads trying to find the root + // region at the same time. One will go do the find while the + // second waits. The second thread will not do find. + + SortedMap tableServers = + this.tablesToServers.get(ROOT_TABLE_NAME); + + if (tableServers == null) { + tableServers = locateRootRegion(); + } + servers.putAll(tableServers); + } + + } else if (tableName.equals(META_TABLE_NAME)) { + synchronized (metaRegionLock) { + // This block guards against two threads trying to load the meta + // region at the same time. The first will load the meta region and + // the second will use the value that the first one found. + + if (tablesToServers.get(ROOT_TABLE_NAME) == null) { + findServersForTable(ROOT_TABLE_NAME); + } + + SortedMap tableServers = + this.tablesToServers.get(META_TABLE_NAME); + + if (tableServers == null) { + for (int tries = 0; tries < numRetries; tries++) { + try { + tableServers = loadMetaFromRoot(); + break; + + } catch (IOException e) { + if (tries < numRetries - 1) { + findServersForTable(ROOT_TABLE_NAME); + continue; + } + throw e; + } + } + } + servers.putAll(tableServers); + } + } else { + boolean waited = false; + synchronized (this.tablesBeingLocated) { + // This block ensures that only one thread will actually try to + // find a table. If a second thread comes along it will wait + // until the first thread finishes finding the table. + + while (this.tablesBeingLocated.contains(tableName)) { + waited = true; + try { + this.tablesBeingLocated.wait(threadWakeFrequency); + } catch (InterruptedException e) { + } + } + if (!waited) { + this.tablesBeingLocated.add(tableName); + + } else { + SortedMap tableServers = + this.tablesToServers.get(tableName); + + if (tableServers == null) { + throw new TableNotFoundException("table not found: " + tableName); + } + servers.putAll(tableServers); + } + } + if (!waited) { + try { + for (int tries = 0; tries < numRetries; tries++) { + boolean success = true; // assume this works + + SortedMap metaServers = + this.tablesToServers.get(META_TABLE_NAME); + if (metaServers == null) { + metaServers = findServersForTable(META_TABLE_NAME); + } + Text firstMetaRegion = metaServers.headMap(tableName).lastKey(); + metaServers = metaServers.tailMap(firstMetaRegion); + + for (HRegionLocation t: metaServers.values()) { + try { + servers.putAll(scanOneMetaRegion(t, tableName)); + + } catch (IOException e) { + if (tries < numRetries - 1) { + findServersForTable(META_TABLE_NAME); + success = false; + break; + } + throw e; + } + } + if (success) { + break; + } + } + } finally { + synchronized (this.tablesBeingLocated) { + // Wake up the threads waiting for us to find the table + this.tablesBeingLocated.remove(tableName); + this.tablesBeingLocated.notifyAll(); + } + } + } + } + this.tablesToServers.put(tableName, servers); + if (LOG.isDebugEnabled()) { + for (Map.Entry e: servers.entrySet()) { + LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() + + " for table " + tableName); + } + } + return servers; + } + + /* + * Load the meta table from the root table. + * + * @return map of first row to TableInfo for all meta regions + * @throws IOException + */ + private TreeMap loadMetaFromRoot() + throws IOException { + + SortedMap rootRegion = + this.tablesToServers.get(ROOT_TABLE_NAME); + + return scanOneMetaRegion( + rootRegion.get(rootRegion.firstKey()), META_TABLE_NAME); + } + + /* + * Repeatedly try to find the root region by asking the master for where it is + * @return TreeMap for root regin if found + * @throws NoServerForRegionException - if the root region can not be located + * after retrying + * @throws IOException + */ + private TreeMap locateRootRegion() + throws IOException { + + getMaster(); + + HServerAddress rootRegionLocation = null; + for (int tries = 0; tries < numRetries; tries++) { + int localTimeouts = 0; + while (rootRegionLocation == null && localTimeouts < numRetries) { + rootRegionLocation = master.findRootRegion(); + if (rootRegionLocation == null) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping. Waiting for root region."); + } + Thread.sleep(pause); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding root region."); + } + } catch (InterruptedException iex) { + // continue + } + localTimeouts++; + } + } + + if (rootRegionLocation == null) { + throw new NoServerForRegionException( + "Timed out trying to locate root region"); + } + + HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); + + try { + rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName); + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // Don't bother sleeping. We've run out of retries. + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } + throw e; + } + + // Sleep and retry finding root region. + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Root region location changed. Sleeping."); + } + Thread.sleep(pause); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding root region."); + } + } catch (InterruptedException iex) { + // continue + } + } + rootRegionLocation = null; + } + + if (rootRegionLocation == null) { + throw new NoServerForRegionException( + "unable to locate root region server"); + } + + TreeMap rootServer = + new TreeMap(); + + rootServer.put(EMPTY_START_ROW, + new HRegionLocation(HGlobals.rootRegionInfo, rootRegionLocation)); + + return rootServer; + } + + /* + * Scans a single meta region + * @param t the meta region we're going to scan + * @param tableName the name of the table we're looking for + * @return returns a map of startingRow to TableInfo + * @throws TableNotFoundException - if table does not exist + * @throws IllegalStateException - if table is offline + * @throws NoServerForRegionException - if table can not be found after retrying + * @throws IOException + */ + private TreeMap scanOneMetaRegion( + final HRegionLocation t, final Text tableName) throws IOException { + + HRegionInterface server = getHRegionConnection(t.getServerAddress()); + TreeMap servers = + new TreeMap(); + + for (int tries = 0; servers.size() == 0 && tries < numRetries; tries++) { + + long scannerId = -1L; + try { + scannerId = + server.openScanner(t.getRegionInfo().getRegionName(), + COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); + + DataInputBuffer inbuf = new DataInputBuffer(); + while (true) { + HRegionInfo regionInfo = null; + String serverAddress = null; + KeyedData[] values = server.next(scannerId); + if (values.length == 0) { + if (servers.size() == 0) { + // If we didn't find any servers then the table does not exist + throw new TableNotFoundException("table '" + tableName + + "' does not exist in " + t); + } + + // We found at least one server for the table and now we're done. + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + servers.size() + " server(s) for " + + "location: " + t + " for tablename " + tableName); + } + break; + } + + byte[] bytes = null; + TreeMap results = new TreeMap(); + for (int i = 0; i < values.length; i++) { + results.put(values[i].getKey().getColumn(), values[i].getData()); + } + regionInfo = new HRegionInfo(); + bytes = results.get(COL_REGIONINFO); + inbuf.reset(bytes, bytes.length); + regionInfo.readFields(inbuf); + + if (!regionInfo.tableDesc.getName().equals(tableName)) { + // We're done + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + servers.size() + " servers for table " + + tableName); + } + break; + } + + if (regionInfo.offLine) { + throw new IllegalStateException("table offline: " + tableName); + } + + bytes = results.get(COL_SERVER); + if (bytes == null || bytes.length == 0) { + // We need to rescan because the table we want is unassigned. + if (LOG.isDebugEnabled()) { + LOG.debug("no server address for " + regionInfo.toString()); + } + servers.clear(); + break; + } + serverAddress = new String(bytes, UTF8_ENCODING); + servers.put(regionInfo.startKey, new HRegionLocation( + regionInfo, new HServerAddress(serverAddress))); + } + } catch (IOException e) { + if (tries == numRetries - 1) { // no retries left + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + } catch (Exception ex) { + LOG.warn(ex); + } + } + } + + if (servers.size() == 0 && tries == numRetries - 1) { + throw new NoServerForRegionException("failed to find server for " + + tableName + " after " + numRetries + " retries"); + } + + if (servers.size() <= 0) { + // The table is not yet being served. Sleep and retry. + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping. Table " + tableName + + " not currently being served."); + } + try { + Thread.sleep(pause); + } catch (InterruptedException ie) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding table " + tableName); + } + } + } + return servers; + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 80ef62657c7..0ce62a6d915 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -41,7 +41,8 @@ public interface HConstants { /** Parameter name for master address */ static final String MASTER_ADDRESS = "hbase.master"; - + + /** default host address */ static final String DEFAULT_HOST = "0.0.0.0"; /** Default master address */ @@ -100,11 +101,15 @@ public interface HConstants { /** The ROOT and META column family */ static final Text COLUMN_FAMILY = new Text("info:"); - + + /** Array of meta column names */ static final Text [] COLUMN_FAMILY_ARRAY = new Text [] {COLUMN_FAMILY}; /** ROOT/META column family member - contains HRegionInfo */ static final Text COL_REGIONINFO = new Text(COLUMN_FAMILY + "regioninfo"); + + /** Array of column - contains HRegionInfo */ + static final Text[] COL_REGIONINFO_ARRAY = new Text [] {COL_REGIONINFO}; /** ROOT/META column family member - contains HServerAddress.toString() */ static final Text COL_SERVER = new Text(COLUMN_FAMILY + "server"); @@ -114,6 +119,9 @@ public interface HConstants { // Other constants + /** used by scanners, etc when they want to start at the beginning of a region */ + static final Text EMPTY_START_ROW = new Text(); + /** When we encode strings, we always specify UTF8 encoding */ static final String UTF8_ENCODING = "UTF-8"; diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index 9126228e20e..a4e091028cf 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -942,25 +942,29 @@ public class HMaster implements HConstants, HMasterInterface, // HRegionServer is shutting down. Cancel the server's lease. - LOG.info("Region server " + s + ": MSG_REPORT_EXITING"); - cancelLease(s, serverLabel); + if (cancelLease(s, serverLabel)) { + // Only process the exit message if the server still has a lease. + // Otherwise we could end up processing the server exit twice. + + LOG.info("Region server " + s + ": MSG_REPORT_EXITING"); - // Get all the regions the server was serving reassigned - // (if we are not shutting down). - - if (!closed) { - for (int i = 1; i < msgs.length; i++) { - HRegionInfo info = msgs[i].getRegionInfo(); - - if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) { - rootRegionLocation = null; - - } else if (info.tableDesc.getName().equals(META_TABLE_NAME)) { - onlineMetaRegions.remove(info.getStartKey()); + // Get all the regions the server was serving reassigned + // (if we are not shutting down). + + if (!closed) { + for (int i = 1; i < msgs.length; i++) { + HRegionInfo info = msgs[i].getRegionInfo(); + + if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) { + rootRegionLocation = null; + + } else if (info.tableDesc.getName().equals(META_TABLE_NAME)) { + onlineMetaRegions.remove(info.getStartKey()); + } + + unassignedRegions.put(info.regionName, info); + assignAttempts.put(info.regionName, Long.valueOf(0L)); } - - unassignedRegions.put(info.regionName, info); - assignAttempts.put(info.regionName, Long.valueOf(0L)); } } @@ -1021,14 +1025,16 @@ public class HMaster implements HConstants, HMasterInterface, } /** cancel a server's lease */ - private void cancelLease(final String serverName, final long serverLabel) - throws IOException { + private boolean cancelLease(final String serverName, final long serverLabel) { + boolean leaseCancelled = false; if (serversToServerInfo.remove(serverName) != null) { // Only cancel lease once. // This method can be called a couple of times during shutdown. LOG.info("Cancelling lease for " + serverName); serverLeases.cancelLease(serverLabel, serverLabel); + leaseCancelled = true; } + return leaseCancelled; } /** Process all the incoming messages from a server that's contacted us. */ @@ -1721,6 +1727,10 @@ public class HMaster implements HConstants, HMasterInterface, if (rootRegionLocation == null || !rootScanned) { // We can't proceed until the root region is online and has been // scanned + if (LOG.isDebugEnabled()) { + LOG.debug("root region=" + rootRegionLocation.toString() + + ", rootScanned=" + rootScanned); + } return false; } metaRegionName = HGlobals.rootRegionInfo.regionName; @@ -1735,6 +1745,11 @@ public class HMaster implements HConstants, HMasterInterface, // online message from being processed. So return false to have this // operation requeued. + if (LOG.isDebugEnabled()) { + LOG.debug("rootScanned=" + rootScanned + ", numberOfMetaRegions=" + + numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" + + onlineMetaRegions.size()); + } return false; } @@ -2474,16 +2489,15 @@ public class HMaster implements HConstants, HMasterInterface, */ public void leaseExpired() { LOG.info(server + " lease expired"); + + // Remove the server from the known servers list + HServerInfo storedInfo = serversToServerInfo.remove(server); - if(rootRegionLocation != null - && rootRegionLocation.toString().equals( - storedInfo.getServerAddress().toString())) { - - rootRegionLocation = null; - unassignedRegions.put(HGlobals.rootRegionInfo.regionName, - HGlobals.rootRegionInfo); - assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); - } + + // NOTE: If the server was serving the root region, we cannot reassign it + // here because the new server will start serving the root region before + // the PendingServerShutdown operation has a chance to split the log file. + try { msgQueue.put(new PendingServerShutdown(storedInfo)); } catch (InterruptedException e) { diff --git a/src/java/org/apache/hadoop/hbase/HRegionLocation.java b/src/java/org/apache/hadoop/hbase/HRegionLocation.java new file mode 100644 index 00000000000..b5eb3da102a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/HRegionLocation.java @@ -0,0 +1,94 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Contains the HRegionInfo for the region and the HServerAddress for the + * HRegionServer serving the region + */ +@SuppressWarnings("unchecked") +public class HRegionLocation implements Comparable { + private HRegionInfo regionInfo; + private HServerAddress serverAddress; + + /** + * Constructor + * + * @param regionInfo the HRegionInfo for the region + * @param serverAddress the HServerAddress for the region server + */ + public HRegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) { + this.regionInfo = regionInfo; + this.serverAddress = serverAddress; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return "address: " + this.serverAddress.toString() + ", regioninfo: " + + this.regionInfo; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) { + return this.compareTo(o) == 0; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + int result = this.regionInfo.hashCode(); + result ^= this.serverAddress.hashCode(); + return result; + } + + /** @return HRegionInfo */ + public HRegionInfo getRegionInfo(){ + return regionInfo; + } + + /** @return HServerAddress */ + public HServerAddress getServerAddress(){ + return serverAddress; + } + + // + // Comparable + // + + /** + * {@inheritDoc} + */ + public int compareTo(Object o) { + HRegionLocation other = (HRegionLocation) o; + int result = this.regionInfo.compareTo(other.regionInfo); + if(result == 0) { + result = this.serverAddress.compareTo(other.serverAddress); + } + return result; + } +} diff --git a/src/java/org/apache/hadoop/hbase/HTable.java b/src/java/org/apache/hadoop/hbase/HTable.java new file mode 100644 index 00000000000..fcc89541af8 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/HTable.java @@ -0,0 +1,850 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Used to communicate with a single HBase table + */ +public class HTable implements HConstants { + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + protected final HConnection connection; + protected final Text tableName; + protected final long pause; + protected final int numRetries; + protected Random rand; + protected volatile SortedMap tableServers; + protected BatchUpdate batch; + + // For row mutation operations + + protected volatile long currentLockId; + protected volatile Text currentRegion; + protected volatile HRegionInterface currentServer; + protected volatile long clientid; + + protected volatile boolean closed; + + /** + * Creates an object to access a HBase table + * + * @param conf configuration object + * @param tableName name of the table + * @throws IOException + */ + public HTable(Configuration conf, Text tableName) throws IOException { + closed = true; + this.connection = HConnectionManager.getConnection(conf); + this.tableName = tableName; + this.pause = conf.getLong("hbase.client.pause", 30 * 1000); + this.numRetries = conf.getInt("hbase.client.retries.number", 5); + this.rand = new Random(); + tableServers = connection.getTableServers(tableName); + this.batch = null; + this.currentLockId = -1L; + closed = false; + } + + /** + * Find region location hosting passed row using cached info + * @param row Row to find. + * @return Location of row. + */ + HRegionLocation getRegionLocation(Text row) { + if (this.tableServers == null) { + throw new IllegalStateException("Must open table first"); + } + + // Only one server will have the row we are looking for + Text serverKey = (this.tableServers.containsKey(row)) ? + row : this.tableServers.headMap(row).lastKey(); + return this.tableServers.get(serverKey); + } + + /** + * Verifies that no update is in progress + */ + public synchronized void checkUpdateInProgress() { + if (batch != null || currentLockId != -1L) { + throw new IllegalStateException("update in progress"); + } + } + + /** + * Gets the starting row key for every region in the currently open table + * @return Array of region starting row keys + */ + public Text[] getStartKeys() { + if (closed) { + throw new IllegalStateException("table is closed"); + } + Text[] keys = new Text[tableServers.size()]; + int i = 0; + for(Text key: tableServers.keySet()){ + keys[i++] = key; + } + return keys; + } + + /** + * Get a single value for the specified row and column + * + * @param row row key + * @param column column name + * @return value for specified row/column + * @throws IOException + */ + public byte[] get(Text row, Text column) throws IOException { + byte [] value = null; + for(int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + value = server.get(r.getRegionInfo().getRegionName(), row, column); + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + return value; + } + + /** + * Get the specified number of versions of the specified row and column + * + * @param row - row key + * @param column - column name + * @param numVersions - number of versions to retrieve + * @return - array byte values + * @throws IOException + */ + public byte[][] get(Text row, Text column, int numVersions) throws IOException { + byte [][] values = null; + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + values = server.get(r.getRegionInfo().getRegionName(), row, column, + numVersions); + + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + + if (values != null) { + ArrayList bytes = new ArrayList(); + for (int i = 0 ; i < values.length; i++) { + bytes.add(values[i]); + } + return bytes.toArray(new byte[values.length][]); + } + return null; + } + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param row - row key + * @param column - column name + * @param timestamp - timestamp + * @param numVersions - number of versions to retrieve + * @return - array of values that match the above criteria + * @throws IOException + */ + public byte[][] get(Text row, Text column, long timestamp, int numVersions) + throws IOException { + byte [][] values = null; + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + values = server.get(r.getRegionInfo().getRegionName(), row, column, + timestamp, numVersions); + + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + + if (values != null) { + ArrayList bytes = new ArrayList(); + for (int i = 0 ; i < values.length; i++) { + bytes.add(values[i]); + } + return bytes.toArray(new byte[values.length][]); + } + return null; + } + + /** + * Get all the data for the specified row + * + * @param row - row key + * @return - map of colums to values + * @throws IOException + */ + public SortedMap getRow(Text row) throws IOException { + KeyedData[] value = null; + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + value = server.getRow(r.getRegionInfo().getRegionName(), row); + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + TreeMap results = new TreeMap(); + if (value != null && value.length != 0) { + for (int i = 0; i < value.length; i++) { + results.put(value[i].getKey().getColumn(), value[i].getData()); + } + } + return results; + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow) throws IOException { + + return obtainScanner(columns, startRow, System.currentTimeMillis(), null); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow, long timestamp) throws IOException { + + return obtainScanner(columns, startRow, timestamp, null); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow, RowFilterInterface filter) throws IOException { + + return obtainScanner(columns, startRow, System.currentTimeMillis(), filter); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow, long timestamp, RowFilterInterface filter) + throws IOException { + + return new ClientScanner(columns, startRow, timestamp, filter); + } + + /** + * Start a batch of row insertions/updates. + * + * No changes are committed until the call to commitBatchUpdate returns. + * A call to abortBatchUpdate will abandon the entire batch. + * + * @param row name of row to be updated + * @return lockid to be used in subsequent put, delete and commit calls + */ + public synchronized long startBatchUpdate(final Text row) { + if (batch != null || currentLockId != -1L) { + throw new IllegalStateException("update in progress"); + } + batch = new BatchUpdate(); + return batch.startUpdate(row); + } + + /** + * Abort a batch mutation + * @param lockid lock id returned by startBatchUpdate + */ + public synchronized void abortBatch(final long lockid) { + if (batch == null) { + throw new IllegalStateException("no batch update in progress"); + } + if (batch.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } + batch = null; + } + + /** + * Finalize a batch mutation + * + * @param lockid lock id returned by startBatchUpdate + * @throws IOException + */ + public void commitBatch(final long lockid) throws IOException { + commitBatch(lockid, System.currentTimeMillis()); + } + + /** + * Finalize a batch mutation + * + * @param lockid lock id returned by startBatchUpdate + * @param timestamp time to associate with all the changes + * @throws IOException + */ + public synchronized void commitBatch(final long lockid, final long timestamp) + throws IOException { + + if (batch == null) { + throw new IllegalStateException("no batch update in progress"); + } + if (batch.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } + + try { + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(batch.getRow()); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch); + break; + + } catch (IOException e) { + if (tries < numRetries -1) { + tableServers = connection.reloadTableServers(tableName); + + } else { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + try { + Thread.sleep(pause); + + } catch (InterruptedException e) { + } + } + } finally { + batch = null; + } + } + + /** + * Start an atomic row insertion/update. No changes are committed until the + * call to commit() returns. A call to abort() will abandon any updates in progress. + * + * Callers to this method are given a lease for each unique lockid; before the + * lease expires, either abort() or commit() must be called. If it is not + * called, the system will automatically call abort() on the client's behalf. + * + * The client can gain extra time with a call to renewLease(). + * Start an atomic row insertion or update + * + * @param row Name of row to start update against. + * @return Row lockid. + * @throws IOException + */ + public synchronized long startUpdate(final Text row) throws IOException { + if (currentLockId != -1L || batch != null) { + throw new IllegalStateException("update in progress"); + } + for (int tries = 0; tries < numRetries; tries++) { + IOException e = null; + HRegionLocation info = getRegionLocation(row); + try { + currentServer = + connection.getHRegionConnection(info.getServerAddress()); + + currentRegion = info.getRegionInfo().getRegionName(); + clientid = rand.nextLong(); + currentLockId = currentServer.startUpdate(currentRegion, clientid, row); + + break; + + } catch (IOException ex) { + e = ex; + } + if (tries < numRetries - 1) { + try { + Thread.sleep(this.pause); + + } catch (InterruptedException ex) { + } + try { + tableServers = connection.reloadTableServers(tableName); + + } catch (IOException ex) { + e = ex; + } + } else { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + return currentLockId; + } + + /** + * Change a value for the specified column. + * Runs {@link #abort(long)} if exception thrown. + * + * @param lockid lock id returned from startUpdate + * @param column column whose value is being set + * @param val new value for column + * @throws IOException + */ + public void put(long lockid, Text column, byte val[]) throws IOException { + if (val == null) { + throw new IllegalArgumentException("value cannot be null"); + } + if (batch != null) { + batch.put(lockid, column, val); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + try { + this.currentServer.put(this.currentRegion, this.clientid, lockid, column, + val); + } catch (IOException e) { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch (IOException e2) { + LOG.warn(e2); + } + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + + /** + * Delete the value for a column + * + * @param lockid - lock id returned from startUpdate + * @param column - name of column whose value is to be deleted + * @throws IOException + */ + public void delete(long lockid, Text column) throws IOException { + if (batch != null) { + batch.delete(lockid, column); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + try { + this.currentServer.delete(this.currentRegion, this.clientid, lockid, + column); + } catch (IOException e) { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch(IOException e2) { + LOG.warn(e2); + } + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + + /** + * Abort a row mutation + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public synchronized void abort(long lockid) throws IOException { + if (batch != null) { + abortBatch(lockid); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + + try { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch (IOException e) { + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } finally { + currentLockId = -1L; + } + } + + /** + * Finalize a row mutation + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public void commit(long lockid) throws IOException { + commit(lockid, System.currentTimeMillis()); + } + + /** + * Finalize a row mutation + * + * @param lockid - lock id returned from startUpdate + * @param timestamp - time to associate with the change + * @throws IOException + */ + public synchronized void commit(long lockid, long timestamp) throws IOException { + if (batch != null) { + commitBatch(lockid, timestamp); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + + try { + try { + this.currentServer.commit(this.currentRegion, this.clientid, lockid, + timestamp); + + } catch (IOException e) { + this.currentServer = null; + this.currentRegion = null; + if(e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } finally { + currentLockId = -1L; + } + } + + /** + * Renew lease on update + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public synchronized void renewLease(long lockid) throws IOException { + if (batch != null) { + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + try { + this.currentServer.renewLease(lockid, this.clientid); + } catch (IOException e) { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch (IOException e2) { + LOG.warn(e2); + } + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + + /** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all. + */ + protected class ClientScanner implements HScannerInterface { + private final Text EMPTY_COLUMN = new Text(); + private Text[] columns; + private Text startRow; + private long scanTime; + @SuppressWarnings("hiding") + private boolean closed; + private AtomicReferenceArray regions; + @SuppressWarnings("hiding") + private int currentRegion; + private HRegionInterface server; + private long scannerId; + private RowFilterInterface filter; + + private void loadRegions() { + Text firstServer = null; + if (this.startRow == null || this.startRow.getLength() == 0) { + firstServer = tableServers.firstKey(); + + } else if(tableServers.containsKey(startRow)) { + firstServer = startRow; + + } else { + firstServer = tableServers.headMap(startRow).lastKey(); + } + Collection info = + tableServers.tailMap(firstServer).values(); + + this.regions = new AtomicReferenceArray( + info.toArray(new HRegionLocation[info.size()])); + } + + protected ClientScanner(Text[] columns, Text startRow, long timestamp, + RowFilterInterface filter) throws IOException { + + this.columns = columns; + this.startRow = startRow; + this.scanTime = timestamp; + this.closed = false; + this.filter = filter; + if (filter != null) { + filter.validate(columns); + } + loadRegions(); + this.currentRegion = -1; + this.server = null; + this.scannerId = -1L; + nextScanner(); + } + + /* + * Gets a scanner for the next region. + * Returns false if there are no more scanners. + */ + private boolean nextScanner() throws IOException { + if (this.scannerId != -1L) { + this.server.close(this.scannerId); + this.scannerId = -1L; + } + this.currentRegion += 1; + if (this.currentRegion == this.regions.length()) { + close(); + return false; + } + try { + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = this.regions.get(currentRegion); + this.server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + if (this.filter == null) { + this.scannerId = + this.server.openScanner(r.getRegionInfo().getRegionName(), + this.columns, currentRegion == 0 ? this.startRow + : EMPTY_START_ROW, scanTime, null); + + } else { + this.scannerId = + this.server.openScanner(r.getRegionInfo().getRegionName(), + this.columns, currentRegion == 0 ? this.startRow + : EMPTY_START_ROW, scanTime, filter); + } + + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + loadRegions(); + } + } + + } catch (IOException e) { + close(); + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + return true; + } + + /** + * {@inheritDoc} + */ + public boolean next(HStoreKey key, TreeMap results) throws IOException { + if (this.closed) { + return false; + } + KeyedData[] values = null; + do { + values = this.server.next(this.scannerId); + } while (values != null && values.length == 0 && nextScanner()); + + if (values != null && values.length != 0) { + for (int i = 0; i < values.length; i++) { + key.setRow(values[i].getKey().getRow()); + key.setVersion(values[i].getKey().getTimestamp()); + key.setColumn(EMPTY_COLUMN); + results.put(values[i].getKey().getColumn(), values[i].getData()); + } + } + return values == null ? false : values.length != 0; + } + + /** + * {@inheritDoc} + */ + public void close() throws IOException { + if (this.scannerId != -1L) { + this.server.close(this.scannerId); + this.scannerId = -1L; + } + this.server = null; + this.closed = true; + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/Leases.java b/src/java/org/apache/hadoop/hbase/Leases.java index 4e714da89ab..e4d025e1068 100644 --- a/src/java/org/apache/hadoop/hbase/Leases.java +++ b/src/java/org/apache/hadoop/hbase/Leases.java @@ -180,10 +180,8 @@ public class Leases { * * @param holderId id of lease holder * @param resourceId id of resource being leased - * @throws IOException */ - public void cancelLease(final long holderId, final long resourceId) - throws IOException { + public void cancelLease(final long holderId, final long resourceId) { LeaseName name = null; synchronized(leases) { synchronized(sortedLeases) { @@ -191,9 +189,8 @@ public class Leases { Lease lease = leases.get(name); if (lease == null) { // It's possible that someone tries to renew the lease, but - // it just expired a moment ago. So fail. - throw new IOException("Cannot cancel lease that is not held: " + - name); + // it just expired a moment ago. So just skip it. + return; } sortedLeases.remove(lease); leases.remove(name); @@ -206,6 +203,7 @@ public class Leases { /** LeaseMonitor is a thread that expires Leases that go on too long. */ class LeaseMonitor implements Runnable { + /** {@inheritDoc} */ public void run() { while(running) { synchronized(leases) { @@ -236,6 +234,7 @@ public class Leases { * A Lease name. * More lightweight than String or Text. */ + @SuppressWarnings("unchecked") class LeaseName implements Comparable { private final long holderId; private final long resourceId; @@ -245,6 +244,7 @@ public class Leases { this.resourceId = rid; } + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { LeaseName other = (LeaseName)obj; @@ -252,6 +252,7 @@ public class Leases { this.resourceId == other.resourceId; } + /** {@inheritDoc} */ @Override public int hashCode() { // Copy OR'ing from javadoc for Long#hashCode. @@ -260,12 +261,14 @@ public class Leases { return result; } + /** {@inheritDoc} */ @Override public String toString() { return Long.toString(this.holderId) + "/" + Long.toString(this.resourceId); } + /** {@inheritDoc} */ public int compareTo(Object obj) { LeaseName other = (LeaseName)obj; if (this.holderId < other.holderId) { @@ -292,6 +295,7 @@ public class Leases { } /** This class tracks a single Lease. */ + @SuppressWarnings("unchecked") private class Lease implements Comparable { final long holderId; final long resourceId; @@ -329,11 +333,13 @@ public class Leases { listener.leaseExpired(); } + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { return compareTo(obj) == 0; } + /** {@inheritDoc} */ @Override public int hashCode() { int result = this.getLeaseName().hashCode(); @@ -345,6 +351,7 @@ public class Leases { // Comparable ////////////////////////////////////////////////////////////////////////////// + /** {@inheritDoc} */ public int compareTo(Object o) { Lease other = (Lease) o; if(this.lastUpdate < other.lastUpdate) { diff --git a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java index 3e1d0cf4a8d..206ef8fec61 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java @@ -53,6 +53,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { this.regionServers = 1; } + /** {@inheritDoc} */ @Override public void setUp() throws Exception { super.setUp(); @@ -60,11 +61,13 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { new MiniHBaseCluster(this.conf, this.regionServers, this.miniHdfs); } + /** {@inheritDoc} */ @Override public void tearDown() throws Exception { super.tearDown(); if (this.cluster != null) { this.cluster.shutdown(); } + HConnectionManager.deleteConnection(conf); } } diff --git a/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java b/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java index 79b03fa735b..0cd8da385cb 100644 --- a/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java +++ b/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; /** * Tests region server failover when a region server exits. @@ -36,6 +38,8 @@ public class TestCleanRegionServerExit extends HBaseClusterTestCase { conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries + Logger.getRootLogger().setLevel(Level.WARN); + Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); } /** diff --git a/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java b/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java index aceb212ec18..c1725a07fd4 100644 --- a/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java +++ b/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java @@ -38,7 +38,8 @@ public class TestRegionServerAbort extends HBaseClusterTestCase { conf.setInt("ipc.client.timeout", 5000); // reduce client timeout conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries -// Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); + Logger.getRootLogger().setLevel(Level.WARN); + Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); } /** diff --git a/src/test/org/apache/hadoop/hbase/TestScanner2.java b/src/test/org/apache/hadoop/hbase/TestScanner2.java index d426b15d803..c98caf9b096 100644 --- a/src/test/org/apache/hadoop/hbase/TestScanner2.java +++ b/src/test/org/apache/hadoop/hbase/TestScanner2.java @@ -69,7 +69,7 @@ public class TestScanner2 extends HBaseClusterTestCase { // Setup colkeys to be inserted HTableDescriptor htd = new HTableDescriptor(getName()); Text tableName = new Text(getName()); - Text[] colKeys = new Text[(int)(LAST_COLKEY - FIRST_COLKEY) + 1]; + Text[] colKeys = new Text[(LAST_COLKEY - FIRST_COLKEY) + 1]; for (char i = 0; i < colKeys.length; i++) { colKeys[i] = new Text(new String(new char[] { (char)(FIRST_COLKEY + i), ':' })); @@ -201,9 +201,9 @@ public class TestScanner2 extends HBaseClusterTestCase { long scannerId = -1L; try { client.openTable(table); - HClient.RegionLocation rl = client.getRegionLocation(table); - regionServer = client.getHRegionConnection(rl.serverAddress); - scannerId = regionServer.openScanner(rl.regionInfo.regionName, + HRegionLocation rl = client.getRegionLocation(table); + regionServer = client.getHRegionConnection(rl.getServerAddress()); + scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(), HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null); while (true) { TreeMap results = new TreeMap();