HBASE-5058 Allow HBaseAmin to use an existing connection (Lars H)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1220989 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2011-12-19 23:01:26 +00:00
parent e0bd180675
commit 2062ea1d00
3 changed files with 69 additions and 13 deletions

View File

@ -95,7 +95,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public HBaseAdmin(Configuration c) public HBaseAdmin(Configuration c)
throws MasterNotRunningException, ZooKeeperConnectionException { throws MasterNotRunningException, ZooKeeperConnectionException {
this.conf = HBaseConfiguration.create(c); this.conf = HBaseConfiguration.create(c);
this.connection = HConnectionManager.getConnection(this.conf); this.connection = HConnectionManager.getConnection(this.conf);
this.pause = this.conf.getLong("hbase.client.pause", 1000); this.pause = this.conf.getLong("hbase.client.pause", 1000);
this.numRetries = this.conf.getInt("hbase.client.retries.number", 10); this.numRetries = this.conf.getInt("hbase.client.retries.number", 10);
this.retryLongerMultiplier = this.conf.getInt( this.retryLongerMultiplier = this.conf.getInt(
@ -111,9 +111,6 @@ public class HBaseAdmin implements Abortable, Closeable {
} catch (MasterNotRunningException mnre) { } catch (MasterNotRunningException mnre) {
HConnectionManager.deleteStaleConnection(this.connection); HConnectionManager.deleteStaleConnection(this.connection);
this.connection = HConnectionManager.getConnection(this.conf); this.connection = HConnectionManager.getConnection(this.conf);
} catch (UndeclaredThrowableException ute) {
HConnectionManager.deleteStaleConnection(this.connection);
this.connection = HConnectionManager.getConnection(this.conf);
} }
tries++; tries++;
@ -135,6 +132,29 @@ public class HBaseAdmin implements Abortable, Closeable {
} }
} }
/**
* Constructor for externally managed HConnections.
* This constructor fails fast if the HMaster is not running.
* The HConnection can be re-used again in another attempt.
* This constructor fails fast.
*
* @param connection The HConnection instance to use
* @throws MasterNotRunningException if the master is not running
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
*/
public HBaseAdmin(HConnection connection)
throws MasterNotRunningException, ZooKeeperConnectionException {
this.conf = connection.getConfiguration();
this.connection = connection;
this.pause = this.conf.getLong("hbase.client.pause", 1000);
this.numRetries = this.conf.getInt("hbase.client.retries.number", 10);
this.retryLongerMultiplier = this.conf.getInt(
"hbase.client.retries.longer.multiplier", 10);
this.connection.getMaster();
}
/** /**
* @return A new CatalogTracker instance; call {@link #cleanupCatalogTracker(CatalogTracker)} * @return A new CatalogTracker instance; call {@link #cleanupCatalogTracker(CatalogTracker)}
* to cleanup the returned catalog tracker. * to cleanup the returned catalog tracker.

View File

@ -623,14 +623,20 @@ public class HConnectionManager {
throws MasterNotRunningException, ZooKeeperConnectionException { throws MasterNotRunningException, ZooKeeperConnectionException {
// Check if we already have a good master connection // Check if we already have a good master connection
if (master != null) { try {
if (master.isMasterRunning()) { if (master != null && master.isMasterRunning()) {
return master; return master;
} }
} catch (UndeclaredThrowableException ute) {
// log, but ignore, the loop below will attempt to reconnect
LOG.info("Exception contacting master. Retrying...", ute.getCause());
} }
checkIfBaseNodeAvailable(); checkIfBaseNodeAvailable();
ServerName sn = null; ServerName sn = null;
synchronized (this.masterLock) { synchronized (this.masterLock) {
this.master = null;
for (int tries = 0; for (int tries = 0;
!this.closed && !this.closed &&
!this.masterChecked && this.master == null && !this.masterChecked && this.master == null &&
@ -679,15 +685,19 @@ public class HConnectionManager {
throw new RuntimeException("Thread was interrupted while trying to connect to master."); throw new RuntimeException("Thread was interrupted while trying to connect to master.");
} }
} }
this.masterChecked = true; // Avoid re-checking in the future if this is a managed HConnection,
} // even if we failed to acquire a master.
if (this.master == null) { // (this is to retain the existing behavior before HBASE-5058)
if (sn == null) { this.masterChecked = managed;
throw new MasterNotRunningException();
if (this.master == null) {
if (sn == null) {
throw new MasterNotRunningException();
}
throw new MasterNotRunningException(sn.toString());
} }
throw new MasterNotRunningException(sn.toString()); return this.master;
} }
return this.master;
} }
private void checkIfBaseNodeAvailable() throws MasterNotRunningException { private void checkIfBaseNodeAvailable() throws MasterNotRunningException {

View File

@ -44,6 +44,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -67,6 +69,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -3851,6 +3854,29 @@ public class TestFromClientSide {
} }
} }
/**
* simple test that just executes parts of the client
* API that accept a pre-created HConnction instance
*
* @throws IOException
*/
@Test
public void testUnmanagedHConnection() throws IOException {
final byte[] tableName = Bytes.toBytes("testUnmanagedHConnection");
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
HConnection conn = HConnectionManager.createConnection(TEST_UTIL
.getConfiguration());
ExecutorService pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory());
((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
HTable t = new HTable(tableName, conn, pool);
HBaseAdmin ha = new HBaseAdmin(conn);
assertTrue(ha.tableExists(tableName));
assertTrue(t.get(new Get(ROW)).isEmpty());
}
@Test @Test
public void testMiscHTableStuff() throws IOException { public void testMiscHTableStuff() throws IOException {
final byte[] tableAname = Bytes.toBytes("testMiscHTableStuffA"); final byte[] tableAname = Bytes.toBytes("testMiscHTableStuffA");