HADOOP-1528 HClient for multiple tables (phase 1)
Modified: HConstants static final Text[] COL_REGIONINFO_ARRAY = new Text [] {COL_REGIONINFO}; static final Text EMPTY_START_ROW = new Text(); HMaster - don't process a region server exit message if the lease has timed out. Otherwise we end up with two pending server shutdown messages to process and chaos ensues. - don't reassign the root region when the server's lease expires. The lease expiration handler will queue a PendingServerShutdown operation that must run before the root region is reassigned because the HLog of the dead server must be split before any regions served by the dead server are reassigned. - added some additional debug level logging HBaseClusterTestCase - call HConnectionManager.deleteConnection(conf) in tearDown() so that multiple tests can be run from the same test class. TestScanner2 - changes to make test compatible with the change from inner class HClient.RegionLocation to public class HRegionLocation Leases - cancelLease just returns if the lease is not found instead of throwing an IOException New: HConnection - an interface that describes the operations performed by a connection implementation HConnectionManager - manages connections for multiple HBase instances and returns an object that implements HConnection from its static method getConnection HBaseAdmin - the HBase administrative methods refactored out of HClient. Each HBaseAdmin object can control a single HBase instance. To manipulate multiple instances, create multiple HBaseAdmin objects. HTable - The data manipulation methods refactored out of HClient. Each HTable object talks to a single table in a single HBase instance. Create multiple HTable objects to use more than one table. HRegionLocation - an inner class refactored out of HClient. Each HRegionLocation has an HRegionInfo object and an HServerAddress object. HClient - totally re-implemented in terms of the new classes above. HClient is now deprecated. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@561935 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a9acbeab08
commit
2bbcc5a122
|
@ -79,3 +79,5 @@ Trunk (unreleased changes)
|
|||
10 concurrent clients
|
||||
50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches
|
||||
to a single row at a time)
|
||||
51. HADOOP-1528 HClient for multiple tables (phase 1)
|
||||
|
||||
|
|
|
@ -0,0 +1,481 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
/**
|
||||
* Provides administrative functions for HBase
|
||||
*/
|
||||
public class HBaseAdmin implements HConstants {
|
||||
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
|
||||
protected final HConnection connection;
|
||||
protected final long pause;
|
||||
protected final int numRetries;
|
||||
protected volatile HMasterInterface master;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param conf Configuration object
|
||||
* @throws MasterNotRunningException
|
||||
*/
|
||||
public HBaseAdmin(Configuration conf) throws MasterNotRunningException {
|
||||
this.connection = HConnectionManager.getConnection(conf);
|
||||
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
|
||||
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||
this.master = connection.getMaster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new table
|
||||
*
|
||||
* @param desc table descriptor for table
|
||||
*
|
||||
* @throws IllegalArgumentException if the table name is reserved
|
||||
* @throws MasterNotRunningException if master is not running
|
||||
* @throws NoServerForRegionException if root region is not being served
|
||||
* @throws TableExistsException if table already exists (If concurrent
|
||||
* threads, the table may have been created between test-for-existence
|
||||
* and attempt-at-creation).
|
||||
* @throws IOException
|
||||
*/
|
||||
public void createTable(HTableDescriptor desc)
|
||||
throws IOException {
|
||||
|
||||
createTableAsync(desc);
|
||||
|
||||
// Wait for new table to come on-line
|
||||
connection.getTableServers(desc.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new table but does not block and wait for it to come online.
|
||||
*
|
||||
* @param desc table descriptor for table
|
||||
*
|
||||
* @throws IllegalArgumentException if the table name is reserved
|
||||
* @throws MasterNotRunningException if master is not running
|
||||
* @throws NoServerForRegionException if root region is not being served
|
||||
* @throws TableExistsException if table already exists (If concurrent
|
||||
* threads, the table may have been created between test-for-existence
|
||||
* and attempt-at-creation).
|
||||
* @throws IOException
|
||||
*/
|
||||
public void createTableAsync(HTableDescriptor desc)
|
||||
throws IOException {
|
||||
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
checkReservedTableName(desc.getName());
|
||||
try {
|
||||
this.master.createTable(desc);
|
||||
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a table
|
||||
*
|
||||
* @param tableName name of table to delete
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteTable(Text tableName) throws IOException {
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
checkReservedTableName(tableName);
|
||||
HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
|
||||
|
||||
try {
|
||||
this.master.deleteTable(tableName);
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
|
||||
// Wait until first region is deleted
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(firstMetaServer.getServerAddress());
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
long scannerId = -1L;
|
||||
try {
|
||||
scannerId =
|
||||
server.openScanner(firstMetaServer.getRegionInfo().getRegionName(),
|
||||
COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null);
|
||||
KeyedData[] values = server.next(scannerId);
|
||||
if (values == null || values.length == 0) {
|
||||
break;
|
||||
}
|
||||
boolean found = false;
|
||||
for (int j = 0; j < values.length; j++) {
|
||||
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[j].getData(), values[j].getData().length);
|
||||
info.readFields(inbuf);
|
||||
if (info.tableDesc.getName().equals(tableName)) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (IOException ex) {
|
||||
if(tries == numRetries - 1) { // no more tries left
|
||||
if (ex instanceof RemoteException) {
|
||||
ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex);
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (scannerId != -1L) {
|
||||
try {
|
||||
server.close(scannerId);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
LOG.info("table " + tableName + " deleted");
|
||||
}
|
||||
|
||||
/**
|
||||
* Brings a table on-line (enables it)
|
||||
*
|
||||
* @param tableName name of the table
|
||||
* @throws IOException
|
||||
*/
|
||||
public void enableTable(Text tableName) throws IOException {
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
checkReservedTableName(tableName);
|
||||
HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
|
||||
|
||||
try {
|
||||
this.master.enableTable(tableName);
|
||||
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
|
||||
// Wait until first region is enabled
|
||||
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(firstMetaServer.getServerAddress());
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
int valuesfound = 0;
|
||||
long scannerId = -1L;
|
||||
try {
|
||||
scannerId =
|
||||
server.openScanner(firstMetaServer.getRegionInfo().getRegionName(),
|
||||
COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null);
|
||||
boolean isenabled = false;
|
||||
|
||||
while (true) {
|
||||
KeyedData[] values = server.next(scannerId);
|
||||
if (values == null || values.length == 0) {
|
||||
if (valuesfound == 0) {
|
||||
throw new NoSuchElementException(
|
||||
"table " + tableName + " not found");
|
||||
}
|
||||
break;
|
||||
}
|
||||
valuesfound += 1;
|
||||
for (int j = 0; j < values.length; j++) {
|
||||
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[j].getData(), values[j].getData().length);
|
||||
info.readFields(inbuf);
|
||||
isenabled = !info.offLine;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (isenabled) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (isenabled) {
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) { // no more retries
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (scannerId != -1L) {
|
||||
try {
|
||||
server.close(scannerId);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleep. Waiting for first region to be enabled from " +
|
||||
tableName);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Waiting for first region to be enabled from " +
|
||||
tableName);
|
||||
}
|
||||
}
|
||||
LOG.info("Enabled table " + tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disables a table (takes it off-line) If it is being served, the master
|
||||
* will tell the servers to stop serving it.
|
||||
*
|
||||
* @param tableName name of table
|
||||
* @throws IOException
|
||||
*/
|
||||
public void disableTable(Text tableName) throws IOException {
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
checkReservedTableName(tableName);
|
||||
HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
|
||||
|
||||
try {
|
||||
this.master.disableTable(tableName);
|
||||
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
|
||||
// Wait until first region is disabled
|
||||
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(firstMetaServer.getServerAddress());
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
int valuesfound = 0;
|
||||
long scannerId = -1L;
|
||||
try {
|
||||
scannerId =
|
||||
server.openScanner(firstMetaServer.getRegionInfo().getRegionName(),
|
||||
COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null);
|
||||
|
||||
boolean disabled = false;
|
||||
while (true) {
|
||||
KeyedData[] values = server.next(scannerId);
|
||||
if (values == null || values.length == 0) {
|
||||
if (valuesfound == 0) {
|
||||
throw new NoSuchElementException("table " + tableName + " not found");
|
||||
}
|
||||
break;
|
||||
}
|
||||
valuesfound += 1;
|
||||
for (int j = 0; j < values.length; j++) {
|
||||
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[j].getData(), values[j].getData().length);
|
||||
info.readFields(inbuf);
|
||||
disabled = info.offLine;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (disabled) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (disabled) {
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) { // no more retries
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (scannerId != -1L) {
|
||||
try {
|
||||
server.close(scannerId);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleep. Waiting for first region to be disabled from " +
|
||||
tableName);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Waiting for first region to be disabled from " +
|
||||
tableName);
|
||||
}
|
||||
}
|
||||
LOG.info("Disabled table " + tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tableName Table to check.
|
||||
* @return True if table exists already.
|
||||
* @throws MasterNotRunningException
|
||||
*/
|
||||
public boolean tableExists(final Text tableName) throws MasterNotRunningException {
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
return connection.tableExists(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a column to an existing table
|
||||
*
|
||||
* @param tableName name of the table to add column to
|
||||
* @param column column descriptor of column to be added
|
||||
* @throws IOException
|
||||
*/
|
||||
public void addColumn(Text tableName, HColumnDescriptor column)
|
||||
throws IOException {
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
checkReservedTableName(tableName);
|
||||
try {
|
||||
this.master.addColumn(tableName, column);
|
||||
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a column from a table
|
||||
*
|
||||
* @param tableName name of table
|
||||
* @param columnName name of column to be deleted
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteColumn(Text tableName, Text columnName)
|
||||
throws IOException {
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
checkReservedTableName(tableName);
|
||||
try {
|
||||
this.master.deleteColumn(tableName, columnName);
|
||||
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the HBase instance
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void shutdown() throws IOException {
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException("master has been shut down");
|
||||
}
|
||||
|
||||
try {
|
||||
this.master.shutdown();
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
} finally {
|
||||
this.master = null;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Verifies that the specified table name is not a reserved name
|
||||
* @param tableName - the table name to be checked
|
||||
* @throws IllegalArgumentException - if the table name is reserved
|
||||
*/
|
||||
protected void checkReservedTableName(Text tableName) {
|
||||
if(tableName.equals(ROOT_TABLE_NAME)
|
||||
|| tableName.equals(META_TABLE_NAME)) {
|
||||
|
||||
throw new IllegalArgumentException(tableName + " is a reserved table name");
|
||||
}
|
||||
}
|
||||
|
||||
private HRegionLocation getFirstMetaServerForTable(Text tableName)
|
||||
throws IOException {
|
||||
SortedMap<Text, HRegionLocation> metaservers =
|
||||
connection.getTableServers(META_TABLE_NAME);
|
||||
|
||||
return metaservers.get((metaservers.containsKey(tableName)) ?
|
||||
tableName : metaservers.headMap(tableName).lastKey());
|
||||
}
|
||||
|
||||
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface HConnection {
|
||||
/**
|
||||
* @return proxy connection to master server for this instance
|
||||
* @throws MasterNotRunningException
|
||||
*/
|
||||
public HMasterInterface getMaster() throws MasterNotRunningException;
|
||||
|
||||
/** @return - true if the master server is running */
|
||||
public boolean isMasterRunning();
|
||||
|
||||
/**
|
||||
* @param tableName Table to check.
|
||||
* @return True if table exists already.
|
||||
*/
|
||||
public boolean tableExists(final Text tableName);
|
||||
|
||||
/**
|
||||
* List all the userspace tables. In other words, scan the META table.
|
||||
*
|
||||
* If we wanted this to be really fast, we could implement a special
|
||||
* catalog table that just contains table names and their descriptors.
|
||||
* Right now, it only exists as part of the META table's region info.
|
||||
*
|
||||
* @return - returns an array of HTableDescriptors
|
||||
* @throws IOException
|
||||
*/
|
||||
public HTableDescriptor[] listTables() throws IOException;
|
||||
|
||||
/**
|
||||
* Gets the servers of the given table.
|
||||
*
|
||||
* @param tableName - the table to be located
|
||||
* @return map of startRow -> RegionLocation
|
||||
* @throws IOException - if the table can not be located after retrying
|
||||
*/
|
||||
public SortedMap<Text, HRegionLocation> getTableServers(Text tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Reloads servers for the specified table.
|
||||
*
|
||||
* @param tableName name of table whose servers are to be reloaded
|
||||
* @return map of start key -> RegionLocation
|
||||
* @throws IOException
|
||||
*/
|
||||
public SortedMap<Text, HRegionLocation>
|
||||
reloadTableServers(final Text tableName) throws IOException;
|
||||
|
||||
/**
|
||||
* Establishes a connection to the region server at the specified address.
|
||||
* @param regionServer - the server to connect to
|
||||
* @return proxy for HRegionServer
|
||||
* @throws IOException
|
||||
*/
|
||||
public HRegionInterface getHRegionConnection(HServerAddress regionServer)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Discard all the information about this table
|
||||
* @param tableName the name of the table to close
|
||||
*/
|
||||
public void close(Text tableName);
|
||||
}
|
|
@ -0,0 +1,761 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
|
||||
/**
|
||||
* A non-instantiable class that manages connections to multiple tables in
|
||||
* multiple HBase instances
|
||||
*/
|
||||
public class HConnectionManager implements HConstants {
|
||||
private HConnectionManager(){} // Not instantiable
|
||||
|
||||
// A Map of master HServerAddress -> connection information for that instance
|
||||
// Note that although the Map is synchronized, the objects it contains
|
||||
// are mutable and hence require synchronized access to them
|
||||
|
||||
private static final Map<String, HConnection> HBASE_INSTANCES =
|
||||
Collections.synchronizedMap(new HashMap<String, HConnection>());
|
||||
|
||||
/**
|
||||
* Get the connection object for the instance specified by the configuration
|
||||
* If no current connection exists, create a new connection for that instance
|
||||
* @param conf
|
||||
* @return HConnection object for the instance specified by the configuration
|
||||
*/
|
||||
public static HConnection getConnection(Configuration conf) {
|
||||
HConnection connection;
|
||||
synchronized (HBASE_INSTANCES) {
|
||||
String instanceName = conf.get(HBASE_DIR, DEFAULT_HBASE_DIR);
|
||||
|
||||
connection = HBASE_INSTANCES.get(instanceName);
|
||||
|
||||
if (connection == null) {
|
||||
connection = new TableServers(conf);
|
||||
HBASE_INSTANCES.put(instanceName, connection);
|
||||
}
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete connection information for the instance specified by the configuration
|
||||
* @param conf
|
||||
*/
|
||||
public static void deleteConnection(Configuration conf) {
|
||||
synchronized (HBASE_INSTANCES) {
|
||||
HBASE_INSTANCES.remove(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
|
||||
}
|
||||
}
|
||||
|
||||
/* encapsulates finding the servers for an HBase instance */
|
||||
private static class TableServers implements HConnection, HConstants {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
private final Class<? extends HRegionInterface> serverInterfaceClass;
|
||||
private final long threadWakeFrequency;
|
||||
private final long pause;
|
||||
private final int numRetries;
|
||||
|
||||
private final Integer masterLock = new Integer(0);
|
||||
private volatile HMasterInterface master;
|
||||
private volatile boolean masterChecked;
|
||||
|
||||
private final Integer rootRegionLock = new Integer(0);
|
||||
private final Integer metaRegionLock = new Integer(0);
|
||||
|
||||
private volatile Configuration conf;
|
||||
|
||||
// Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
|
||||
private Map<Text, SortedMap<Text, HRegionLocation>> tablesToServers;
|
||||
|
||||
// Set of closed tables
|
||||
private Set<Text> closedTables;
|
||||
|
||||
// Set of tables currently being located
|
||||
private HashSet<Text> tablesBeingLocated;
|
||||
|
||||
// Known region HServerAddress.toString() -> HRegionInterface
|
||||
private HashMap<String, HRegionInterface> servers;
|
||||
|
||||
/** constructor
|
||||
* @param conf Configuration object
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public TableServers(Configuration conf) {
|
||||
this.conf = conf;
|
||||
|
||||
String serverClassName =
|
||||
conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);
|
||||
|
||||
try {
|
||||
this.serverInterfaceClass =
|
||||
(Class<? extends HRegionInterface>) Class.forName(serverClassName);
|
||||
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find region server interface " + serverClassName, e);
|
||||
}
|
||||
|
||||
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
|
||||
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||
|
||||
this.master = null;
|
||||
this.masterChecked = false;
|
||||
|
||||
this.tablesToServers = Collections.synchronizedMap(
|
||||
new HashMap<Text, SortedMap<Text, HRegionLocation>>());
|
||||
|
||||
this.closedTables = Collections.synchronizedSet(new HashSet<Text>());
|
||||
this.tablesBeingLocated = new HashSet<Text>();
|
||||
|
||||
this.servers = new HashMap<String, HRegionInterface>();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HMasterInterface getMaster() throws MasterNotRunningException {
|
||||
synchronized (this.masterLock) {
|
||||
for (int tries = 0;
|
||||
!this.masterChecked && this.master == null && tries < numRetries;
|
||||
tries++) {
|
||||
|
||||
HServerAddress masterLocation = new HServerAddress(this.conf.get(
|
||||
MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS));
|
||||
|
||||
try {
|
||||
HMasterInterface tryMaster = (HMasterInterface)RPC.getProxy(
|
||||
HMasterInterface.class, HMasterInterface.versionID,
|
||||
masterLocation.getInetSocketAddress(), this.conf);
|
||||
|
||||
if (tryMaster.isMasterRunning()) {
|
||||
this.master = tryMaster;
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
if(tries == numRetries - 1) {
|
||||
// This was our last chance - don't bother sleeping
|
||||
break;
|
||||
}
|
||||
LOG.info("Attempt " + tries + " of " + this.numRetries +
|
||||
" failed with <" + e + ">. Retrying after sleep of " + this.pause);
|
||||
}
|
||||
|
||||
// We either cannot connect to master or it is not running. Sleep & retry
|
||||
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
this.masterChecked = true;
|
||||
}
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException();
|
||||
}
|
||||
return this.master;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean isMasterRunning() {
|
||||
if (this.master == null) {
|
||||
try {
|
||||
getMaster();
|
||||
|
||||
} catch (MasterNotRunningException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean tableExists(final Text tableName) {
|
||||
boolean exists = true;
|
||||
try {
|
||||
SortedMap<Text, HRegionLocation> servers = getTableServers(tableName);
|
||||
if (servers == null || servers.size() == 0) {
|
||||
exists = false;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
exists = false;
|
||||
}
|
||||
return exists;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HTableDescriptor[] listTables() throws IOException {
|
||||
TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
|
||||
|
||||
SortedMap<Text, HRegionLocation> metaTables =
|
||||
getTableServers(META_TABLE_NAME);
|
||||
|
||||
for (HRegionLocation t: metaTables.values()) {
|
||||
HRegionInterface server = getHRegionConnection(t.getServerAddress());
|
||||
long scannerId = -1L;
|
||||
try {
|
||||
scannerId = server.openScanner(t.getRegionInfo().getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(),
|
||||
null);
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
while (true) {
|
||||
KeyedData[] values = server.next(scannerId);
|
||||
if (values.length == 0) {
|
||||
break;
|
||||
}
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[i].getData(), values[i].getData().length);
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
info.readFields(inbuf);
|
||||
|
||||
// Only examine the rows where the startKey is zero length
|
||||
if (info.startKey.getLength() == 0) {
|
||||
uniqueTables.add(info.tableDesc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (RemoteException ex) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(ex);
|
||||
|
||||
} finally {
|
||||
if (scannerId != -1L) {
|
||||
server.close(scannerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public SortedMap<Text, HRegionLocation>
|
||||
getTableServers(Text tableName) throws IOException {
|
||||
|
||||
if (tableName == null || tableName.getLength() == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"table name cannot be null or zero length");
|
||||
}
|
||||
|
||||
if (closedTables.contains(tableName)) {
|
||||
throw new IllegalStateException("table closed: " + tableName);
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
tablesToServers.get(tableName);
|
||||
|
||||
if (tableServers == null ) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No servers for " + tableName + ". Doing a find...");
|
||||
}
|
||||
// We don't know where the table is.
|
||||
// Load the information from meta.
|
||||
tableServers = findServersForTable(tableName);
|
||||
}
|
||||
SortedMap<Text, HRegionLocation> servers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
servers.putAll(tableServers);
|
||||
return servers;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public SortedMap<Text, HRegionLocation>
|
||||
reloadTableServers(final Text tableName) throws IOException {
|
||||
|
||||
if (closedTables.contains(tableName)) {
|
||||
throw new IllegalStateException("table closed: " + tableName);
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> servers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
// Reload information for the whole table
|
||||
|
||||
servers.putAll(findServersForTable(tableName));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Result of findTable: " + servers.toString());
|
||||
}
|
||||
|
||||
return servers;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HRegionInterface getHRegionConnection(
|
||||
HServerAddress regionServer) throws IOException {
|
||||
|
||||
HRegionInterface server;
|
||||
synchronized (this.servers) {
|
||||
// See if we already have a connection
|
||||
server = this.servers.get(regionServer.toString());
|
||||
|
||||
if (server == null) { // Get a connection
|
||||
long versionId = 0;
|
||||
try {
|
||||
versionId =
|
||||
serverInterfaceClass.getDeclaredField("versionID").getLong(server);
|
||||
|
||||
} catch (IllegalAccessException e) {
|
||||
// Should never happen unless visibility of versionID changes
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to open a connection to a " +
|
||||
serverInterfaceClass.getName() + " server.", e);
|
||||
|
||||
} catch (NoSuchFieldException e) {
|
||||
// Should never happen unless versionID field name changes in HRegionInterface
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to open a connection to a " +
|
||||
serverInterfaceClass.getName() + " server.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass,
|
||||
versionId, regionServer.getInetSocketAddress(), this.conf);
|
||||
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
|
||||
this.servers.put(regionServer.toString(), server);
|
||||
}
|
||||
}
|
||||
return server;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void close(Text tableName) {
|
||||
if (tableName == null || tableName.getLength() == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"table name cannot be null or zero length");
|
||||
}
|
||||
|
||||
if (closedTables.contains(tableName)) {
|
||||
throw new IllegalStateException("table closed: " + tableName);
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
tablesToServers.remove(tableName);
|
||||
|
||||
if (tableServers == null) {
|
||||
throw new IllegalArgumentException("table was not opened: " + tableName);
|
||||
}
|
||||
|
||||
closedTables.add(tableName);
|
||||
|
||||
// Shut down connections to the HRegionServers
|
||||
|
||||
synchronized (this.servers) {
|
||||
for (HRegionLocation r: tableServers.values()) {
|
||||
this.servers.remove(r.getServerAddress().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Clears the cache of all known information about the specified table and
|
||||
* locates a table by searching the META or ROOT region (as appropriate) or
|
||||
* by querying the master for the location of the root region if that is the
|
||||
* table requested.
|
||||
*
|
||||
* @param tableName - name of table to find servers for
|
||||
* @return - map of first row to table info for all regions in the table
|
||||
* @throws IOException
|
||||
*/
|
||||
private SortedMap<Text, HRegionLocation> findServersForTable(Text tableName)
|
||||
throws IOException {
|
||||
|
||||
// Wipe out everything we know about this table
|
||||
|
||||
if (this.tablesToServers.remove(tableName) != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wiping out all we know of " + tableName);
|
||||
}
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> servers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
if (tableName.equals(ROOT_TABLE_NAME)) {
|
||||
synchronized (rootRegionLock) {
|
||||
// This block guards against two threads trying to find the root
|
||||
// region at the same time. One will go do the find while the
|
||||
// second waits. The second thread will not do find.
|
||||
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
this.tablesToServers.get(ROOT_TABLE_NAME);
|
||||
|
||||
if (tableServers == null) {
|
||||
tableServers = locateRootRegion();
|
||||
}
|
||||
servers.putAll(tableServers);
|
||||
}
|
||||
|
||||
} else if (tableName.equals(META_TABLE_NAME)) {
|
||||
synchronized (metaRegionLock) {
|
||||
// This block guards against two threads trying to load the meta
|
||||
// region at the same time. The first will load the meta region and
|
||||
// the second will use the value that the first one found.
|
||||
|
||||
if (tablesToServers.get(ROOT_TABLE_NAME) == null) {
|
||||
findServersForTable(ROOT_TABLE_NAME);
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
this.tablesToServers.get(META_TABLE_NAME);
|
||||
|
||||
if (tableServers == null) {
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
try {
|
||||
tableServers = loadMetaFromRoot();
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries < numRetries - 1) {
|
||||
findServersForTable(ROOT_TABLE_NAME);
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
servers.putAll(tableServers);
|
||||
}
|
||||
} else {
|
||||
boolean waited = false;
|
||||
synchronized (this.tablesBeingLocated) {
|
||||
// This block ensures that only one thread will actually try to
|
||||
// find a table. If a second thread comes along it will wait
|
||||
// until the first thread finishes finding the table.
|
||||
|
||||
while (this.tablesBeingLocated.contains(tableName)) {
|
||||
waited = true;
|
||||
try {
|
||||
this.tablesBeingLocated.wait(threadWakeFrequency);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
if (!waited) {
|
||||
this.tablesBeingLocated.add(tableName);
|
||||
|
||||
} else {
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
this.tablesToServers.get(tableName);
|
||||
|
||||
if (tableServers == null) {
|
||||
throw new TableNotFoundException("table not found: " + tableName);
|
||||
}
|
||||
servers.putAll(tableServers);
|
||||
}
|
||||
}
|
||||
if (!waited) {
|
||||
try {
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
boolean success = true; // assume this works
|
||||
|
||||
SortedMap<Text, HRegionLocation> metaServers =
|
||||
this.tablesToServers.get(META_TABLE_NAME);
|
||||
if (metaServers == null) {
|
||||
metaServers = findServersForTable(META_TABLE_NAME);
|
||||
}
|
||||
Text firstMetaRegion = metaServers.headMap(tableName).lastKey();
|
||||
metaServers = metaServers.tailMap(firstMetaRegion);
|
||||
|
||||
for (HRegionLocation t: metaServers.values()) {
|
||||
try {
|
||||
servers.putAll(scanOneMetaRegion(t, tableName));
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries < numRetries - 1) {
|
||||
findServersForTable(META_TABLE_NAME);
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if (success) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
synchronized (this.tablesBeingLocated) {
|
||||
// Wake up the threads waiting for us to find the table
|
||||
this.tablesBeingLocated.remove(tableName);
|
||||
this.tablesBeingLocated.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.tablesToServers.put(tableName, servers);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Map.Entry<Text, HRegionLocation> e: servers.entrySet()) {
|
||||
LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() +
|
||||
" for table " + tableName);
|
||||
}
|
||||
}
|
||||
return servers;
|
||||
}
|
||||
|
||||
/*
|
||||
* Load the meta table from the root table.
|
||||
*
|
||||
* @return map of first row to TableInfo for all meta regions
|
||||
* @throws IOException
|
||||
*/
|
||||
private TreeMap<Text, HRegionLocation> loadMetaFromRoot()
|
||||
throws IOException {
|
||||
|
||||
SortedMap<Text, HRegionLocation> rootRegion =
|
||||
this.tablesToServers.get(ROOT_TABLE_NAME);
|
||||
|
||||
return scanOneMetaRegion(
|
||||
rootRegion.get(rootRegion.firstKey()), META_TABLE_NAME);
|
||||
}
|
||||
|
||||
/*
|
||||
* Repeatedly try to find the root region by asking the master for where it is
|
||||
* @return TreeMap<Text, TableInfo> for root regin if found
|
||||
* @throws NoServerForRegionException - if the root region can not be located
|
||||
* after retrying
|
||||
* @throws IOException
|
||||
*/
|
||||
private TreeMap<Text, HRegionLocation> locateRootRegion()
|
||||
throws IOException {
|
||||
|
||||
getMaster();
|
||||
|
||||
HServerAddress rootRegionLocation = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
int localTimeouts = 0;
|
||||
while (rootRegionLocation == null && localTimeouts < numRetries) {
|
||||
rootRegionLocation = master.findRootRegion();
|
||||
if (rootRegionLocation == null) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping. Waiting for root region.");
|
||||
}
|
||||
Thread.sleep(pause);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Retry finding root region.");
|
||||
}
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
}
|
||||
localTimeouts++;
|
||||
}
|
||||
}
|
||||
|
||||
if (rootRegionLocation == null) {
|
||||
throw new NoServerForRegionException(
|
||||
"Timed out trying to locate root region");
|
||||
}
|
||||
|
||||
HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation);
|
||||
|
||||
try {
|
||||
rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
// Don't bother sleeping. We've run out of retries.
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException(
|
||||
(RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Sleep and retry finding root region.
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Root region location changed. Sleeping.");
|
||||
}
|
||||
Thread.sleep(pause);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Retry finding root region.");
|
||||
}
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
rootRegionLocation = null;
|
||||
}
|
||||
|
||||
if (rootRegionLocation == null) {
|
||||
throw new NoServerForRegionException(
|
||||
"unable to locate root region server");
|
||||
}
|
||||
|
||||
TreeMap<Text, HRegionLocation> rootServer =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
rootServer.put(EMPTY_START_ROW,
|
||||
new HRegionLocation(HGlobals.rootRegionInfo, rootRegionLocation));
|
||||
|
||||
return rootServer;
|
||||
}
|
||||
|
||||
/*
|
||||
* Scans a single meta region
|
||||
* @param t the meta region we're going to scan
|
||||
* @param tableName the name of the table we're looking for
|
||||
* @return returns a map of startingRow to TableInfo
|
||||
* @throws TableNotFoundException - if table does not exist
|
||||
* @throws IllegalStateException - if table is offline
|
||||
* @throws NoServerForRegionException - if table can not be found after retrying
|
||||
* @throws IOException
|
||||
*/
|
||||
private TreeMap<Text, HRegionLocation> scanOneMetaRegion(
|
||||
final HRegionLocation t, final Text tableName) throws IOException {
|
||||
|
||||
HRegionInterface server = getHRegionConnection(t.getServerAddress());
|
||||
TreeMap<Text, HRegionLocation> servers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
for (int tries = 0; servers.size() == 0 && tries < numRetries; tries++) {
|
||||
|
||||
long scannerId = -1L;
|
||||
try {
|
||||
scannerId =
|
||||
server.openScanner(t.getRegionInfo().getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
while (true) {
|
||||
HRegionInfo regionInfo = null;
|
||||
String serverAddress = null;
|
||||
KeyedData[] values = server.next(scannerId);
|
||||
if (values.length == 0) {
|
||||
if (servers.size() == 0) {
|
||||
// If we didn't find any servers then the table does not exist
|
||||
throw new TableNotFoundException("table '" + tableName +
|
||||
"' does not exist in " + t);
|
||||
}
|
||||
|
||||
// We found at least one server for the table and now we're done.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found " + servers.size() + " server(s) for " +
|
||||
"location: " + t + " for tablename " + tableName);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
byte[] bytes = null;
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
results.put(values[i].getKey().getColumn(), values[i].getData());
|
||||
}
|
||||
regionInfo = new HRegionInfo();
|
||||
bytes = results.get(COL_REGIONINFO);
|
||||
inbuf.reset(bytes, bytes.length);
|
||||
regionInfo.readFields(inbuf);
|
||||
|
||||
if (!regionInfo.tableDesc.getName().equals(tableName)) {
|
||||
// We're done
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found " + servers.size() + " servers for table " +
|
||||
tableName);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (regionInfo.offLine) {
|
||||
throw new IllegalStateException("table offline: " + tableName);
|
||||
}
|
||||
|
||||
bytes = results.get(COL_SERVER);
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
// We need to rescan because the table we want is unassigned.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("no server address for " + regionInfo.toString());
|
||||
}
|
||||
servers.clear();
|
||||
break;
|
||||
}
|
||||
serverAddress = new String(bytes, UTF8_ENCODING);
|
||||
servers.put(regionInfo.startKey, new HRegionLocation(
|
||||
regionInfo, new HServerAddress(serverAddress)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) { // no retries left
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (scannerId != -1L) {
|
||||
try {
|
||||
server.close(scannerId);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (servers.size() == 0 && tries == numRetries - 1) {
|
||||
throw new NoServerForRegionException("failed to find server for " +
|
||||
tableName + " after " + numRetries + " retries");
|
||||
}
|
||||
|
||||
if (servers.size() <= 0) {
|
||||
// The table is not yet being served. Sleep and retry.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping. Table " + tableName +
|
||||
" not currently being served.");
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException ie) {
|
||||
// continue
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Retry finding table " + tableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
return servers;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -41,7 +41,8 @@ public interface HConstants {
|
|||
|
||||
/** Parameter name for master address */
|
||||
static final String MASTER_ADDRESS = "hbase.master";
|
||||
|
||||
|
||||
/** default host address */
|
||||
static final String DEFAULT_HOST = "0.0.0.0";
|
||||
|
||||
/** Default master address */
|
||||
|
@ -100,11 +101,15 @@ public interface HConstants {
|
|||
|
||||
/** The ROOT and META column family */
|
||||
static final Text COLUMN_FAMILY = new Text("info:");
|
||||
|
||||
|
||||
/** Array of meta column names */
|
||||
static final Text [] COLUMN_FAMILY_ARRAY = new Text [] {COLUMN_FAMILY};
|
||||
|
||||
/** ROOT/META column family member - contains HRegionInfo */
|
||||
static final Text COL_REGIONINFO = new Text(COLUMN_FAMILY + "regioninfo");
|
||||
|
||||
/** Array of column - contains HRegionInfo */
|
||||
static final Text[] COL_REGIONINFO_ARRAY = new Text [] {COL_REGIONINFO};
|
||||
|
||||
/** ROOT/META column family member - contains HServerAddress.toString() */
|
||||
static final Text COL_SERVER = new Text(COLUMN_FAMILY + "server");
|
||||
|
@ -114,6 +119,9 @@ public interface HConstants {
|
|||
|
||||
// Other constants
|
||||
|
||||
/** used by scanners, etc when they want to start at the beginning of a region */
|
||||
static final Text EMPTY_START_ROW = new Text();
|
||||
|
||||
/** When we encode strings, we always specify UTF8 encoding */
|
||||
static final String UTF8_ENCODING = "UTF-8";
|
||||
|
||||
|
|
|
@ -942,25 +942,29 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
|
||||
// HRegionServer is shutting down. Cancel the server's lease.
|
||||
|
||||
LOG.info("Region server " + s + ": MSG_REPORT_EXITING");
|
||||
cancelLease(s, serverLabel);
|
||||
if (cancelLease(s, serverLabel)) {
|
||||
// Only process the exit message if the server still has a lease.
|
||||
// Otherwise we could end up processing the server exit twice.
|
||||
|
||||
LOG.info("Region server " + s + ": MSG_REPORT_EXITING");
|
||||
|
||||
// Get all the regions the server was serving reassigned
|
||||
// (if we are not shutting down).
|
||||
|
||||
if (!closed) {
|
||||
for (int i = 1; i < msgs.length; i++) {
|
||||
HRegionInfo info = msgs[i].getRegionInfo();
|
||||
|
||||
if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) {
|
||||
rootRegionLocation = null;
|
||||
|
||||
} else if (info.tableDesc.getName().equals(META_TABLE_NAME)) {
|
||||
onlineMetaRegions.remove(info.getStartKey());
|
||||
// Get all the regions the server was serving reassigned
|
||||
// (if we are not shutting down).
|
||||
|
||||
if (!closed) {
|
||||
for (int i = 1; i < msgs.length; i++) {
|
||||
HRegionInfo info = msgs[i].getRegionInfo();
|
||||
|
||||
if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) {
|
||||
rootRegionLocation = null;
|
||||
|
||||
} else if (info.tableDesc.getName().equals(META_TABLE_NAME)) {
|
||||
onlineMetaRegions.remove(info.getStartKey());
|
||||
}
|
||||
|
||||
unassignedRegions.put(info.regionName, info);
|
||||
assignAttempts.put(info.regionName, Long.valueOf(0L));
|
||||
}
|
||||
|
||||
unassignedRegions.put(info.regionName, info);
|
||||
assignAttempts.put(info.regionName, Long.valueOf(0L));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1021,14 +1025,16 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
}
|
||||
|
||||
/** cancel a server's lease */
|
||||
private void cancelLease(final String serverName, final long serverLabel)
|
||||
throws IOException {
|
||||
private boolean cancelLease(final String serverName, final long serverLabel) {
|
||||
boolean leaseCancelled = false;
|
||||
if (serversToServerInfo.remove(serverName) != null) {
|
||||
// Only cancel lease once.
|
||||
// This method can be called a couple of times during shutdown.
|
||||
LOG.info("Cancelling lease for " + serverName);
|
||||
serverLeases.cancelLease(serverLabel, serverLabel);
|
||||
leaseCancelled = true;
|
||||
}
|
||||
return leaseCancelled;
|
||||
}
|
||||
|
||||
/** Process all the incoming messages from a server that's contacted us. */
|
||||
|
@ -1721,6 +1727,10 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
if (rootRegionLocation == null || !rootScanned) {
|
||||
// We can't proceed until the root region is online and has been
|
||||
// scanned
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("root region=" + rootRegionLocation.toString() +
|
||||
", rootScanned=" + rootScanned);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
metaRegionName = HGlobals.rootRegionInfo.regionName;
|
||||
|
@ -1735,6 +1745,11 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
// online message from being processed. So return false to have this
|
||||
// operation requeued.
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("rootScanned=" + rootScanned + ", numberOfMetaRegions=" +
|
||||
numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" +
|
||||
onlineMetaRegions.size());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2474,16 +2489,15 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
*/
|
||||
public void leaseExpired() {
|
||||
LOG.info(server + " lease expired");
|
||||
|
||||
// Remove the server from the known servers list
|
||||
|
||||
HServerInfo storedInfo = serversToServerInfo.remove(server);
|
||||
if(rootRegionLocation != null
|
||||
&& rootRegionLocation.toString().equals(
|
||||
storedInfo.getServerAddress().toString())) {
|
||||
|
||||
rootRegionLocation = null;
|
||||
unassignedRegions.put(HGlobals.rootRegionInfo.regionName,
|
||||
HGlobals.rootRegionInfo);
|
||||
assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
|
||||
}
|
||||
|
||||
// NOTE: If the server was serving the root region, we cannot reassign it
|
||||
// here because the new server will start serving the root region before
|
||||
// the PendingServerShutdown operation has a chance to split the log file.
|
||||
|
||||
try {
|
||||
msgQueue.put(new PendingServerShutdown(storedInfo));
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
/**
|
||||
* Contains the HRegionInfo for the region and the HServerAddress for the
|
||||
* HRegionServer serving the region
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class HRegionLocation implements Comparable {
|
||||
private HRegionInfo regionInfo;
|
||||
private HServerAddress serverAddress;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param regionInfo the HRegionInfo for the region
|
||||
* @param serverAddress the HServerAddress for the region server
|
||||
*/
|
||||
public HRegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) {
|
||||
this.regionInfo = regionInfo;
|
||||
this.serverAddress = serverAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "address: " + this.serverAddress.toString() + ", regioninfo: " +
|
||||
this.regionInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return this.compareTo(o) == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = this.regionInfo.hashCode();
|
||||
result ^= this.serverAddress.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
/** @return HRegionInfo */
|
||||
public HRegionInfo getRegionInfo(){
|
||||
return regionInfo;
|
||||
}
|
||||
|
||||
/** @return HServerAddress */
|
||||
public HServerAddress getServerAddress(){
|
||||
return serverAddress;
|
||||
}
|
||||
|
||||
//
|
||||
// Comparable
|
||||
//
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public int compareTo(Object o) {
|
||||
HRegionLocation other = (HRegionLocation) o;
|
||||
int result = this.regionInfo.compareTo(other.regionInfo);
|
||||
if(result == 0) {
|
||||
result = this.serverAddress.compareTo(other.serverAddress);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,850 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Random;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
/**
|
||||
* Used to communicate with a single HBase table
|
||||
*/
|
||||
public class HTable implements HConstants {
|
||||
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
|
||||
protected final HConnection connection;
|
||||
protected final Text tableName;
|
||||
protected final long pause;
|
||||
protected final int numRetries;
|
||||
protected Random rand;
|
||||
protected volatile SortedMap<Text, HRegionLocation> tableServers;
|
||||
protected BatchUpdate batch;
|
||||
|
||||
// For row mutation operations
|
||||
|
||||
protected volatile long currentLockId;
|
||||
protected volatile Text currentRegion;
|
||||
protected volatile HRegionInterface currentServer;
|
||||
protected volatile long clientid;
|
||||
|
||||
protected volatile boolean closed;
|
||||
|
||||
/**
|
||||
* Creates an object to access a HBase table
|
||||
*
|
||||
* @param conf configuration object
|
||||
* @param tableName name of the table
|
||||
* @throws IOException
|
||||
*/
|
||||
public HTable(Configuration conf, Text tableName) throws IOException {
|
||||
closed = true;
|
||||
this.connection = HConnectionManager.getConnection(conf);
|
||||
this.tableName = tableName;
|
||||
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
|
||||
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||
this.rand = new Random();
|
||||
tableServers = connection.getTableServers(tableName);
|
||||
this.batch = null;
|
||||
this.currentLockId = -1L;
|
||||
closed = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find region location hosting passed row using cached info
|
||||
* @param row Row to find.
|
||||
* @return Location of row.
|
||||
*/
|
||||
HRegionLocation getRegionLocation(Text row) {
|
||||
if (this.tableServers == null) {
|
||||
throw new IllegalStateException("Must open table first");
|
||||
}
|
||||
|
||||
// Only one server will have the row we are looking for
|
||||
Text serverKey = (this.tableServers.containsKey(row)) ?
|
||||
row : this.tableServers.headMap(row).lastKey();
|
||||
return this.tableServers.get(serverKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that no update is in progress
|
||||
*/
|
||||
public synchronized void checkUpdateInProgress() {
|
||||
if (batch != null || currentLockId != -1L) {
|
||||
throw new IllegalStateException("update in progress");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the starting row key for every region in the currently open table
|
||||
* @return Array of region starting row keys
|
||||
*/
|
||||
public Text[] getStartKeys() {
|
||||
if (closed) {
|
||||
throw new IllegalStateException("table is closed");
|
||||
}
|
||||
Text[] keys = new Text[tableServers.size()];
|
||||
int i = 0;
|
||||
for(Text key: tableServers.keySet()){
|
||||
keys[i++] = key;
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a single value for the specified row and column
|
||||
*
|
||||
* @param row row key
|
||||
* @param column column name
|
||||
* @return value for specified row/column
|
||||
* @throws IOException
|
||||
*/
|
||||
public byte[] get(Text row, Text column) throws IOException {
|
||||
byte [] value = null;
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
value = server.get(r.getRegionInfo().getRegionName(), row, column);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the specified number of versions of the specified row and column
|
||||
*
|
||||
* @param row - row key
|
||||
* @param column - column name
|
||||
* @param numVersions - number of versions to retrieve
|
||||
* @return - array byte values
|
||||
* @throws IOException
|
||||
*/
|
||||
public byte[][] get(Text row, Text column, int numVersions) throws IOException {
|
||||
byte [][] values = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
values = server.get(r.getRegionInfo().getRegionName(), row, column,
|
||||
numVersions);
|
||||
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
// No more tries
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
|
||||
if (values != null) {
|
||||
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
|
||||
for (int i = 0 ; i < values.length; i++) {
|
||||
bytes.add(values[i]);
|
||||
}
|
||||
return bytes.toArray(new byte[values.length][]);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the specified number of versions of the specified row and column with
|
||||
* the specified timestamp.
|
||||
*
|
||||
* @param row - row key
|
||||
* @param column - column name
|
||||
* @param timestamp - timestamp
|
||||
* @param numVersions - number of versions to retrieve
|
||||
* @return - array of values that match the above criteria
|
||||
* @throws IOException
|
||||
*/
|
||||
public byte[][] get(Text row, Text column, long timestamp, int numVersions)
|
||||
throws IOException {
|
||||
byte [][] values = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
values = server.get(r.getRegionInfo().getRegionName(), row, column,
|
||||
timestamp, numVersions);
|
||||
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
// No more tries
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
|
||||
if (values != null) {
|
||||
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
|
||||
for (int i = 0 ; i < values.length; i++) {
|
||||
bytes.add(values[i]);
|
||||
}
|
||||
return bytes.toArray(new byte[values.length][]);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the data for the specified row
|
||||
*
|
||||
* @param row - row key
|
||||
* @return - map of colums to values
|
||||
* @throws IOException
|
||||
*/
|
||||
public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
|
||||
KeyedData[] value = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
value = server.getRow(r.getRegionInfo().getRegionName(), row);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
// No more tries
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
if (value != null && value.length != 0) {
|
||||
for (int i = 0; i < value.length; i++) {
|
||||
results.put(value[i].getKey().getColumn(), value[i].getData());
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a scanner on the current table starting at the specified row.
|
||||
* Return the specified columns.
|
||||
*
|
||||
* @param columns array of columns to return
|
||||
* @param startRow starting row in table to scan
|
||||
* @return scanner
|
||||
* @throws IOException
|
||||
*/
|
||||
public HScannerInterface obtainScanner(Text[] columns,
|
||||
Text startRow) throws IOException {
|
||||
|
||||
return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a scanner on the current table starting at the specified row.
|
||||
* Return the specified columns.
|
||||
*
|
||||
* @param columns array of columns to return
|
||||
* @param startRow starting row in table to scan
|
||||
* @param timestamp only return results whose timestamp <= this value
|
||||
* @return scanner
|
||||
* @throws IOException
|
||||
*/
|
||||
public HScannerInterface obtainScanner(Text[] columns,
|
||||
Text startRow, long timestamp) throws IOException {
|
||||
|
||||
return obtainScanner(columns, startRow, timestamp, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a scanner on the current table starting at the specified row.
|
||||
* Return the specified columns.
|
||||
*
|
||||
* @param columns array of columns to return
|
||||
* @param startRow starting row in table to scan
|
||||
* @param filter a row filter using row-key regexp and/or column data filter.
|
||||
* @return scanner
|
||||
* @throws IOException
|
||||
*/
|
||||
public HScannerInterface obtainScanner(Text[] columns,
|
||||
Text startRow, RowFilterInterface filter) throws IOException {
|
||||
|
||||
return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a scanner on the current table starting at the specified row.
|
||||
* Return the specified columns.
|
||||
*
|
||||
* @param columns array of columns to return
|
||||
* @param startRow starting row in table to scan
|
||||
* @param timestamp only return results whose timestamp <= this value
|
||||
* @param filter a row filter using row-key regexp and/or column data filter.
|
||||
* @return scanner
|
||||
* @throws IOException
|
||||
*/
|
||||
public HScannerInterface obtainScanner(Text[] columns,
|
||||
Text startRow, long timestamp, RowFilterInterface filter)
|
||||
throws IOException {
|
||||
|
||||
return new ClientScanner(columns, startRow, timestamp, filter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a batch of row insertions/updates.
|
||||
*
|
||||
* No changes are committed until the call to commitBatchUpdate returns.
|
||||
* A call to abortBatchUpdate will abandon the entire batch.
|
||||
*
|
||||
* @param row name of row to be updated
|
||||
* @return lockid to be used in subsequent put, delete and commit calls
|
||||
*/
|
||||
public synchronized long startBatchUpdate(final Text row) {
|
||||
if (batch != null || currentLockId != -1L) {
|
||||
throw new IllegalStateException("update in progress");
|
||||
}
|
||||
batch = new BatchUpdate();
|
||||
return batch.startUpdate(row);
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort a batch mutation
|
||||
* @param lockid lock id returned by startBatchUpdate
|
||||
*/
|
||||
public synchronized void abortBatch(final long lockid) {
|
||||
if (batch == null) {
|
||||
throw new IllegalStateException("no batch update in progress");
|
||||
}
|
||||
if (batch.getLockid() != lockid) {
|
||||
throw new IllegalArgumentException("invalid lock id " + lockid);
|
||||
}
|
||||
batch = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize a batch mutation
|
||||
*
|
||||
* @param lockid lock id returned by startBatchUpdate
|
||||
* @throws IOException
|
||||
*/
|
||||
public void commitBatch(final long lockid) throws IOException {
|
||||
commitBatch(lockid, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize a batch mutation
|
||||
*
|
||||
* @param lockid lock id returned by startBatchUpdate
|
||||
* @param timestamp time to associate with all the changes
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void commitBatch(final long lockid, final long timestamp)
|
||||
throws IOException {
|
||||
|
||||
if (batch == null) {
|
||||
throw new IllegalStateException("no batch update in progress");
|
||||
}
|
||||
if (batch.getLockid() != lockid) {
|
||||
throw new IllegalArgumentException("invalid lock id " + lockid);
|
||||
}
|
||||
|
||||
try {
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(batch.getRow());
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries < numRetries -1) {
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
|
||||
} else {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
batch = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start an atomic row insertion/update. No changes are committed until the
|
||||
* call to commit() returns. A call to abort() will abandon any updates in progress.
|
||||
*
|
||||
* Callers to this method are given a lease for each unique lockid; before the
|
||||
* lease expires, either abort() or commit() must be called. If it is not
|
||||
* called, the system will automatically call abort() on the client's behalf.
|
||||
*
|
||||
* The client can gain extra time with a call to renewLease().
|
||||
* Start an atomic row insertion or update
|
||||
*
|
||||
* @param row Name of row to start update against.
|
||||
* @return Row lockid.
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized long startUpdate(final Text row) throws IOException {
|
||||
if (currentLockId != -1L || batch != null) {
|
||||
throw new IllegalStateException("update in progress");
|
||||
}
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
IOException e = null;
|
||||
HRegionLocation info = getRegionLocation(row);
|
||||
try {
|
||||
currentServer =
|
||||
connection.getHRegionConnection(info.getServerAddress());
|
||||
|
||||
currentRegion = info.getRegionInfo().getRegionName();
|
||||
clientid = rand.nextLong();
|
||||
currentLockId = currentServer.startUpdate(currentRegion, clientid, row);
|
||||
|
||||
break;
|
||||
|
||||
} catch (IOException ex) {
|
||||
e = ex;
|
||||
}
|
||||
if (tries < numRetries - 1) {
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
||||
} catch (InterruptedException ex) {
|
||||
}
|
||||
try {
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
|
||||
} catch (IOException ex) {
|
||||
e = ex;
|
||||
}
|
||||
} else {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return currentLockId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Change a value for the specified column.
|
||||
* Runs {@link #abort(long)} if exception thrown.
|
||||
*
|
||||
* @param lockid lock id returned from startUpdate
|
||||
* @param column column whose value is being set
|
||||
* @param val new value for column
|
||||
* @throws IOException
|
||||
*/
|
||||
public void put(long lockid, Text column, byte val[]) throws IOException {
|
||||
if (val == null) {
|
||||
throw new IllegalArgumentException("value cannot be null");
|
||||
}
|
||||
if (batch != null) {
|
||||
batch.put(lockid, column, val);
|
||||
return;
|
||||
}
|
||||
|
||||
if (lockid != currentLockId) {
|
||||
throw new IllegalArgumentException("invalid lockid");
|
||||
}
|
||||
try {
|
||||
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
|
||||
val);
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
||||
} catch (IOException e2) {
|
||||
LOG.warn(e2);
|
||||
}
|
||||
this.currentServer = null;
|
||||
this.currentRegion = null;
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the value for a column
|
||||
*
|
||||
* @param lockid - lock id returned from startUpdate
|
||||
* @param column - name of column whose value is to be deleted
|
||||
* @throws IOException
|
||||
*/
|
||||
public void delete(long lockid, Text column) throws IOException {
|
||||
if (batch != null) {
|
||||
batch.delete(lockid, column);
|
||||
return;
|
||||
}
|
||||
|
||||
if (lockid != currentLockId) {
|
||||
throw new IllegalArgumentException("invalid lockid");
|
||||
}
|
||||
try {
|
||||
this.currentServer.delete(this.currentRegion, this.clientid, lockid,
|
||||
column);
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
||||
} catch(IOException e2) {
|
||||
LOG.warn(e2);
|
||||
}
|
||||
this.currentServer = null;
|
||||
this.currentRegion = null;
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort a row mutation
|
||||
*
|
||||
* @param lockid - lock id returned from startUpdate
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void abort(long lockid) throws IOException {
|
||||
if (batch != null) {
|
||||
abortBatch(lockid);
|
||||
return;
|
||||
}
|
||||
|
||||
if (lockid != currentLockId) {
|
||||
throw new IllegalArgumentException("invalid lockid");
|
||||
}
|
||||
|
||||
try {
|
||||
try {
|
||||
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
||||
} catch (IOException e) {
|
||||
this.currentServer = null;
|
||||
this.currentRegion = null;
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
currentLockId = -1L;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize a row mutation
|
||||
*
|
||||
* @param lockid - lock id returned from startUpdate
|
||||
* @throws IOException
|
||||
*/
|
||||
public void commit(long lockid) throws IOException {
|
||||
commit(lockid, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize a row mutation
|
||||
*
|
||||
* @param lockid - lock id returned from startUpdate
|
||||
* @param timestamp - time to associate with the change
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void commit(long lockid, long timestamp) throws IOException {
|
||||
if (batch != null) {
|
||||
commitBatch(lockid, timestamp);
|
||||
return;
|
||||
}
|
||||
|
||||
if (lockid != currentLockId) {
|
||||
throw new IllegalArgumentException("invalid lockid");
|
||||
}
|
||||
|
||||
try {
|
||||
try {
|
||||
this.currentServer.commit(this.currentRegion, this.clientid, lockid,
|
||||
timestamp);
|
||||
|
||||
} catch (IOException e) {
|
||||
this.currentServer = null;
|
||||
this.currentRegion = null;
|
||||
if(e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
currentLockId = -1L;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew lease on update
|
||||
*
|
||||
* @param lockid - lock id returned from startUpdate
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void renewLease(long lockid) throws IOException {
|
||||
if (batch != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (lockid != currentLockId) {
|
||||
throw new IllegalArgumentException("invalid lockid");
|
||||
}
|
||||
try {
|
||||
this.currentServer.renewLease(lockid, this.clientid);
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
||||
} catch (IOException e2) {
|
||||
LOG.warn(e2);
|
||||
}
|
||||
this.currentServer = null;
|
||||
this.currentRegion = null;
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements the scanner interface for the HBase client.
|
||||
* If there are multiple regions in a table, this scanner will iterate
|
||||
* through them all.
|
||||
*/
|
||||
protected class ClientScanner implements HScannerInterface {
|
||||
private final Text EMPTY_COLUMN = new Text();
|
||||
private Text[] columns;
|
||||
private Text startRow;
|
||||
private long scanTime;
|
||||
@SuppressWarnings("hiding")
|
||||
private boolean closed;
|
||||
private AtomicReferenceArray<HRegionLocation> regions;
|
||||
@SuppressWarnings("hiding")
|
||||
private int currentRegion;
|
||||
private HRegionInterface server;
|
||||
private long scannerId;
|
||||
private RowFilterInterface filter;
|
||||
|
||||
private void loadRegions() {
|
||||
Text firstServer = null;
|
||||
if (this.startRow == null || this.startRow.getLength() == 0) {
|
||||
firstServer = tableServers.firstKey();
|
||||
|
||||
} else if(tableServers.containsKey(startRow)) {
|
||||
firstServer = startRow;
|
||||
|
||||
} else {
|
||||
firstServer = tableServers.headMap(startRow).lastKey();
|
||||
}
|
||||
Collection<HRegionLocation> info =
|
||||
tableServers.tailMap(firstServer).values();
|
||||
|
||||
this.regions = new AtomicReferenceArray<HRegionLocation>(
|
||||
info.toArray(new HRegionLocation[info.size()]));
|
||||
}
|
||||
|
||||
protected ClientScanner(Text[] columns, Text startRow, long timestamp,
|
||||
RowFilterInterface filter) throws IOException {
|
||||
|
||||
this.columns = columns;
|
||||
this.startRow = startRow;
|
||||
this.scanTime = timestamp;
|
||||
this.closed = false;
|
||||
this.filter = filter;
|
||||
if (filter != null) {
|
||||
filter.validate(columns);
|
||||
}
|
||||
loadRegions();
|
||||
this.currentRegion = -1;
|
||||
this.server = null;
|
||||
this.scannerId = -1L;
|
||||
nextScanner();
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets a scanner for the next region.
|
||||
* Returns false if there are no more scanners.
|
||||
*/
|
||||
private boolean nextScanner() throws IOException {
|
||||
if (this.scannerId != -1L) {
|
||||
this.server.close(this.scannerId);
|
||||
this.scannerId = -1L;
|
||||
}
|
||||
this.currentRegion += 1;
|
||||
if (this.currentRegion == this.regions.length()) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = this.regions.get(currentRegion);
|
||||
this.server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
if (this.filter == null) {
|
||||
this.scannerId =
|
||||
this.server.openScanner(r.getRegionInfo().getRegionName(),
|
||||
this.columns, currentRegion == 0 ? this.startRow
|
||||
: EMPTY_START_ROW, scanTime, null);
|
||||
|
||||
} else {
|
||||
this.scannerId =
|
||||
this.server.openScanner(r.getRegionInfo().getRegionName(),
|
||||
this.columns, currentRegion == 0 ? this.startRow
|
||||
: EMPTY_START_ROW, scanTime, filter);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
// No more tries
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
loadRegions();
|
||||
}
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
close();
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
|
||||
if (this.closed) {
|
||||
return false;
|
||||
}
|
||||
KeyedData[] values = null;
|
||||
do {
|
||||
values = this.server.next(this.scannerId);
|
||||
} while (values != null && values.length == 0 && nextScanner());
|
||||
|
||||
if (values != null && values.length != 0) {
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
key.setRow(values[i].getKey().getRow());
|
||||
key.setVersion(values[i].getKey().getTimestamp());
|
||||
key.setColumn(EMPTY_COLUMN);
|
||||
results.put(values[i].getKey().getColumn(), values[i].getData());
|
||||
}
|
||||
}
|
||||
return values == null ? false : values.length != 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
if (this.scannerId != -1L) {
|
||||
this.server.close(this.scannerId);
|
||||
this.scannerId = -1L;
|
||||
}
|
||||
this.server = null;
|
||||
this.closed = true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -180,10 +180,8 @@ public class Leases {
|
|||
*
|
||||
* @param holderId id of lease holder
|
||||
* @param resourceId id of resource being leased
|
||||
* @throws IOException
|
||||
*/
|
||||
public void cancelLease(final long holderId, final long resourceId)
|
||||
throws IOException {
|
||||
public void cancelLease(final long holderId, final long resourceId) {
|
||||
LeaseName name = null;
|
||||
synchronized(leases) {
|
||||
synchronized(sortedLeases) {
|
||||
|
@ -191,9 +189,8 @@ public class Leases {
|
|||
Lease lease = leases.get(name);
|
||||
if (lease == null) {
|
||||
// It's possible that someone tries to renew the lease, but
|
||||
// it just expired a moment ago. So fail.
|
||||
throw new IOException("Cannot cancel lease that is not held: " +
|
||||
name);
|
||||
// it just expired a moment ago. So just skip it.
|
||||
return;
|
||||
}
|
||||
sortedLeases.remove(lease);
|
||||
leases.remove(name);
|
||||
|
@ -206,6 +203,7 @@ public class Leases {
|
|||
|
||||
/** LeaseMonitor is a thread that expires Leases that go on too long. */
|
||||
class LeaseMonitor implements Runnable {
|
||||
/** {@inheritDoc} */
|
||||
public void run() {
|
||||
while(running) {
|
||||
synchronized(leases) {
|
||||
|
@ -236,6 +234,7 @@ public class Leases {
|
|||
* A Lease name.
|
||||
* More lightweight than String or Text.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
class LeaseName implements Comparable {
|
||||
private final long holderId;
|
||||
private final long resourceId;
|
||||
|
@ -245,6 +244,7 @@ public class Leases {
|
|||
this.resourceId = rid;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
LeaseName other = (LeaseName)obj;
|
||||
|
@ -252,6 +252,7 @@ public class Leases {
|
|||
this.resourceId == other.resourceId;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public int hashCode() {
|
||||
// Copy OR'ing from javadoc for Long#hashCode.
|
||||
|
@ -260,12 +261,14 @@ public class Leases {
|
|||
return result;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public String toString() {
|
||||
return Long.toString(this.holderId) + "/" +
|
||||
Long.toString(this.resourceId);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public int compareTo(Object obj) {
|
||||
LeaseName other = (LeaseName)obj;
|
||||
if (this.holderId < other.holderId) {
|
||||
|
@ -292,6 +295,7 @@ public class Leases {
|
|||
}
|
||||
|
||||
/** This class tracks a single Lease. */
|
||||
@SuppressWarnings("unchecked")
|
||||
private class Lease implements Comparable {
|
||||
final long holderId;
|
||||
final long resourceId;
|
||||
|
@ -329,11 +333,13 @@ public class Leases {
|
|||
listener.leaseExpired();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return compareTo(obj) == 0;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = this.getLeaseName().hashCode();
|
||||
|
@ -345,6 +351,7 @@ public class Leases {
|
|||
// Comparable
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public int compareTo(Object o) {
|
||||
Lease other = (Lease) o;
|
||||
if(this.lastUpdate < other.lastUpdate) {
|
||||
|
|
|
@ -53,6 +53,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
this.regionServers = 1;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
@ -60,11 +61,13 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
new MiniHBaseCluster(this.conf, this.regionServers, this.miniHdfs);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (this.cluster != null) {
|
||||
this.cluster.shutdown();
|
||||
}
|
||||
HConnectionManager.deleteConnection(conf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.io.IOException;
|
|||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
/**
|
||||
* Tests region server failover when a region server exits.
|
||||
|
@ -36,6 +38,8 @@ public class TestCleanRegionServerExit extends HBaseClusterTestCase {
|
|||
conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
|
||||
conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
|
||||
conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries
|
||||
Logger.getRootLogger().setLevel(Level.WARN);
|
||||
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -38,7 +38,8 @@ public class TestRegionServerAbort extends HBaseClusterTestCase {
|
|||
conf.setInt("ipc.client.timeout", 5000); // reduce client timeout
|
||||
conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
|
||||
conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries
|
||||
// Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
|
||||
Logger.getRootLogger().setLevel(Level.WARN);
|
||||
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
// Setup colkeys to be inserted
|
||||
HTableDescriptor htd = new HTableDescriptor(getName());
|
||||
Text tableName = new Text(getName());
|
||||
Text[] colKeys = new Text[(int)(LAST_COLKEY - FIRST_COLKEY) + 1];
|
||||
Text[] colKeys = new Text[(LAST_COLKEY - FIRST_COLKEY) + 1];
|
||||
for (char i = 0; i < colKeys.length; i++) {
|
||||
colKeys[i] = new Text(new String(new char[] {
|
||||
(char)(FIRST_COLKEY + i), ':' }));
|
||||
|
@ -201,9 +201,9 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
long scannerId = -1L;
|
||||
try {
|
||||
client.openTable(table);
|
||||
HClient.RegionLocation rl = client.getRegionLocation(table);
|
||||
regionServer = client.getHRegionConnection(rl.serverAddress);
|
||||
scannerId = regionServer.openScanner(rl.regionInfo.regionName,
|
||||
HRegionLocation rl = client.getRegionLocation(table);
|
||||
regionServer = client.getHRegionConnection(rl.getServerAddress());
|
||||
scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(),
|
||||
HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null);
|
||||
while (true) {
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
|
|
Loading…
Reference in New Issue