HBASE-5399 Cut the link between the client and the zookeeper ensemble
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1299872 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d04201683a
commit
e4fefa0073
|
@ -51,4 +51,8 @@ public class MasterNotRunningException extends IOException {
|
||||||
public MasterNotRunningException(Exception e) {
|
public MasterNotRunningException(Exception e) {
|
||||||
super(e);
|
super(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MasterNotRunningException(String s, Exception e) {
|
||||||
|
super(s, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,17 +19,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
|
||||||
import java.net.SocketTimeoutException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -46,7 +35,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.RegionException;
|
import org.apache.hadoop.hbase.RegionException;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableExistsException;
|
import org.apache.hadoop.hbase.TableExistsException;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
|
@ -64,6 +52,18 @@ import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides an interface to manage HBase database table metadata + general
|
* Provides an interface to manage HBase database table metadata + general
|
||||||
|
@ -77,86 +77,54 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class HBaseAdmin implements Abortable, Closeable {
|
public class HBaseAdmin implements Abortable, Closeable {
|
||||||
private final Log LOG = LogFactory.getLog(this.getClass().getName());
|
private static final Log LOG = LogFactory.getLog(HBaseAdmin.class);
|
||||||
// private final HConnection connection;
|
|
||||||
private HConnection connection;
|
// We use the implementation class rather then the interface because we
|
||||||
|
// need the package protected functions to get the connection to master
|
||||||
|
private HConnectionManager.HConnectionImplementation connection;
|
||||||
|
|
||||||
private volatile Configuration conf;
|
private volatile Configuration conf;
|
||||||
private final long pause;
|
private final long pause;
|
||||||
private final int numRetries;
|
private final int numRetries;
|
||||||
// Some operations can take a long time such as disable of big table.
|
// Some operations can take a long time such as disable of big table.
|
||||||
// numRetries is for 'normal' stuff... Mutliply by this factor when
|
// numRetries is for 'normal' stuff... Multiply by this factor when
|
||||||
// want to wait a long time.
|
// want to wait a long time.
|
||||||
private final int retryLongerMultiplier;
|
private final int retryLongerMultiplier;
|
||||||
private boolean aborted;
|
private boolean aborted;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor.
|
||||||
|
* See {@link #HBaseAdmin(HConnection connection)}
|
||||||
*
|
*
|
||||||
* @param c Configuration object
|
* @param c Configuration object. Copied internally.
|
||||||
* @throws MasterNotRunningException if the master is not running
|
|
||||||
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
|
||||||
*/
|
*/
|
||||||
public HBaseAdmin(Configuration c)
|
public HBaseAdmin(Configuration c)
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
this.conf = HBaseConfiguration.create(c);
|
// Will not leak connections, as the new implementation of the constructor
|
||||||
this.connection = HConnectionManager.getConnection(this.conf);
|
// does not throw exceptions anymore.
|
||||||
this.pause = this.conf.getLong("hbase.client.pause", 1000);
|
this(HConnectionManager.getConnection(new Configuration(c)));
|
||||||
this.numRetries = this.conf.getInt("hbase.client.retries.number", 10);
|
|
||||||
this.retryLongerMultiplier = this.conf.getInt(
|
|
||||||
"hbase.client.retries.longer.multiplier", 10);
|
|
||||||
|
|
||||||
int tries = 0;
|
|
||||||
while ( true ){
|
|
||||||
try {
|
|
||||||
|
|
||||||
this.connection.getMaster();
|
|
||||||
return;
|
|
||||||
|
|
||||||
} catch (MasterNotRunningException mnre) {
|
|
||||||
HConnectionManager.deleteStaleConnection(this.connection);
|
|
||||||
this.connection = HConnectionManager.getConnection(this.conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
tries++;
|
|
||||||
if (tries >= numRetries) {
|
|
||||||
// we should delete connection between client and zookeeper
|
|
||||||
HConnectionManager.deleteStaleConnection(this.connection);
|
|
||||||
throw new MasterNotRunningException("Retried " + numRetries + " times");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Thread.sleep(getPauseTime(tries));
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
// we should delete connection between client and zookeeper
|
|
||||||
HConnectionManager.deleteStaleConnection(this.connection);
|
|
||||||
throw new MasterNotRunningException(
|
|
||||||
"Interrupted after "+tries+" tries");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for externally managed HConnections.
|
* Constructor for externally managed HConnections.
|
||||||
* This constructor fails fast if the HMaster is not running.
|
* The connection to master will be created when required by admin functions.
|
||||||
* The HConnection can be re-used again in another attempt.
|
*
|
||||||
* This constructor fails fast.
|
* @param connection The HConnection instance to use
|
||||||
*
|
* @throws MasterNotRunningException, ZooKeeperConnectionException are not
|
||||||
* @param connection The HConnection instance to use
|
* thrown anymore but kept into the interface for backward api compatibility
|
||||||
* @throws MasterNotRunningException if the master is not running
|
*/
|
||||||
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
|
||||||
*/
|
|
||||||
public HBaseAdmin(HConnection connection)
|
public HBaseAdmin(HConnection connection)
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
this.conf = connection.getConfiguration();
|
this.conf = connection.getConfiguration();
|
||||||
this.connection = connection;
|
|
||||||
|
// We want the real class, without showing it our public interface,
|
||||||
|
// hence the cast.
|
||||||
|
this.connection = (HConnectionManager.HConnectionImplementation)connection;
|
||||||
|
|
||||||
this.pause = this.conf.getLong("hbase.client.pause", 1000);
|
this.pause = this.conf.getLong("hbase.client.pause", 1000);
|
||||||
this.numRetries = this.conf.getInt("hbase.client.retries.number", 10);
|
this.numRetries = this.conf.getInt("hbase.client.retries.number", 10);
|
||||||
this.retryLongerMultiplier = this.conf.getInt(
|
this.retryLongerMultiplier = this.conf.getInt(
|
||||||
"hbase.client.retries.longer.multiplier", 10);
|
"hbase.client.retries.longer.multiplier", 10);
|
||||||
|
|
||||||
this.connection.getMaster();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -206,18 +174,25 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @return proxy connection to master server for this instance
|
* @return proxy connection to master server for this instance
|
||||||
* @throws MasterNotRunningException if the master is not running
|
* @throws MasterNotRunningException if the master is not running
|
||||||
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
||||||
|
* @deprecated Master is an implementation detail for HBaseAdmin.
|
||||||
|
* Deprecated in HBase 0.94
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public HMasterInterface getMaster()
|
public HMasterInterface getMaster()
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
return this.connection.getMaster();
|
// We take a shared master, but we will never release it,
|
||||||
|
// so we will have the same behavior as before.
|
||||||
|
return this.connection.getKeepAliveMaster();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return - true if the master server is running
|
/** @return - true if the master server is running. Throws an exception
|
||||||
|
* otherwise.
|
||||||
* @throws ZooKeeperConnectionException
|
* @throws ZooKeeperConnectionException
|
||||||
* @throws MasterNotRunningException */
|
* @throws MasterNotRunningException
|
||||||
public boolean isMasterRunning()
|
*/
|
||||||
|
public boolean isMasterRunning()
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
return this.connection.isMasterRunning();
|
return connection.isMasterRunning();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -464,7 +439,8 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* and attempt-at-creation).
|
* and attempt-at-creation).
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void createTableAsync(HTableDescriptor desc, byte [][] splitKeys)
|
public void createTableAsync(
|
||||||
|
final HTableDescriptor desc, final byte [][] splitKeys)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTableDescriptor.isLegalTableName(desc.getName());
|
HTableDescriptor.isLegalTableName(desc.getName());
|
||||||
if(splitKeys != null && splitKeys.length > 1) {
|
if(splitKeys != null && splitKeys.length > 1) {
|
||||||
|
@ -480,11 +456,14 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
lastKey = splitKey;
|
lastKey = splitKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
getMaster().createTable(desc, splitKeys);
|
execute(new MasterCallable<Void>() {
|
||||||
} catch (RemoteException e) {
|
@Override
|
||||||
throw e.unwrapRemoteException();
|
public Void call() throws IOException {
|
||||||
}
|
master.createTable(desc, splitKeys);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -506,14 +485,17 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
public void deleteTable(final byte [] tableName) throws IOException {
|
public void deleteTable(final byte [] tableName) throws IOException {
|
||||||
isMasterRunning();
|
|
||||||
HTableDescriptor.isLegalTableName(tableName);
|
HTableDescriptor.isLegalTableName(tableName);
|
||||||
HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
|
HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
|
||||||
try {
|
|
||||||
getMaster().deleteTable(tableName);
|
execute(new MasterCallable<Void>() {
|
||||||
} catch (RemoteException e) {
|
@Override
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
public Void call() throws IOException {
|
||||||
}
|
master.deleteTable(tableName);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Wait until all regions deleted
|
// Wait until all regions deleted
|
||||||
HRegionInterface server =
|
HRegionInterface server =
|
||||||
connection.getHRegionConnection(firstMetaServer.getHostname(), firstMetaServer.getPort());
|
connection.getHRegionConnection(firstMetaServer.getHostname(), firstMetaServer.getPort());
|
||||||
|
@ -533,7 +515,13 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
// HMaster removes the table from its HTableDescriptors
|
// HMaster removes the table from its HTableDescriptors
|
||||||
if (values == null) {
|
if (values == null) {
|
||||||
boolean tableExists = false;
|
boolean tableExists = false;
|
||||||
HTableDescriptor[] htds = getMaster().getHTableDescriptors();
|
HTableDescriptor[] htds;
|
||||||
|
MasterKeepAliveConnection master = connection.getKeepAliveMaster();
|
||||||
|
try {
|
||||||
|
htds = master.getHTableDescriptors();
|
||||||
|
} finally {
|
||||||
|
master.close();
|
||||||
|
}
|
||||||
if (htds != null && htds.length > 0) {
|
if (htds != null && htds.length > 0) {
|
||||||
for (HTableDescriptor htd: htds) {
|
for (HTableDescriptor htd: htds) {
|
||||||
if (Bytes.equals(tableName, htd.getName())) {
|
if (Bytes.equals(tableName, htd.getName())) {
|
||||||
|
@ -549,15 +537,16 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
if(tries == numRetries - 1) { // no more tries left
|
if(tries == numRetries - 1) { // no more tries left
|
||||||
if (ex instanceof RemoteException) {
|
if (ex instanceof RemoteException) {
|
||||||
ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex);
|
throw ((RemoteException) ex).unwrapRemoteException();
|
||||||
|
}else {
|
||||||
|
throw ex;
|
||||||
}
|
}
|
||||||
throw ex;
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (scannerId != -1L) {
|
if (scannerId != -1L) {
|
||||||
try {
|
try {
|
||||||
server.close(scannerId);
|
server.close(scannerId);
|
||||||
} catch (Exception ex) {
|
} catch (IOException ex) {
|
||||||
LOG.warn(ex);
|
LOG.warn(ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -682,13 +671,14 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void enableTableAsync(final byte [] tableName)
|
public void enableTableAsync(final byte [] tableName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
isMasterRunning();
|
execute(new MasterCallable<Void>() {
|
||||||
try {
|
@Override
|
||||||
getMaster().enableTable(tableName);
|
public Void call() throws IOException {
|
||||||
} catch (RemoteException e) {
|
LOG.info("Started enable of " + Bytes.toString(tableName));
|
||||||
throw e.unwrapRemoteException();
|
master.enableTable(tableName);
|
||||||
}
|
return null;
|
||||||
LOG.info("Started enable of " + Bytes.toString(tableName));
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -750,13 +740,14 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @since 0.90.0
|
* @since 0.90.0
|
||||||
*/
|
*/
|
||||||
public void disableTableAsync(final byte [] tableName) throws IOException {
|
public void disableTableAsync(final byte [] tableName) throws IOException {
|
||||||
isMasterRunning();
|
execute(new MasterCallable<Void>() {
|
||||||
try {
|
@Override
|
||||||
getMaster().disableTable(tableName);
|
public Void call() throws IOException {
|
||||||
} catch (RemoteException e) {
|
LOG.info("Started disable of " + Bytes.toString(tableName));
|
||||||
throw e.unwrapRemoteException();
|
master.disableTable(tableName);
|
||||||
}
|
return null;
|
||||||
LOG.info("Started disable of " + Bytes.toString(tableName));
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void disableTable(final String tableName)
|
public void disableTable(final String tableName)
|
||||||
|
@ -919,11 +910,12 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
public Pair<Integer, Integer> getAlterStatus(final byte[] tableName)
|
public Pair<Integer, Integer> getAlterStatus(final byte[] tableName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTableDescriptor.isLegalTableName(tableName);
|
HTableDescriptor.isLegalTableName(tableName);
|
||||||
try {
|
return execute(new MasterCallable<Pair<Integer, Integer>>() {
|
||||||
return getMaster().getAlterStatus(tableName);
|
@Override
|
||||||
} catch (RemoteException e) {
|
public Pair<Integer, Integer> call() throws IOException {
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
return master.getAlterStatus(tableName);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -947,14 +939,15 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @param column column descriptor of column to be added
|
* @param column column descriptor of column to be added
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
public void addColumn(final byte [] tableName, HColumnDescriptor column)
|
public void addColumn(final byte [] tableName, final HColumnDescriptor column)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTableDescriptor.isLegalTableName(tableName);
|
execute(new MasterCallable<Void>() {
|
||||||
try {
|
@Override
|
||||||
getMaster().addColumn(tableName, column);
|
public Void call() throws IOException {
|
||||||
} catch (RemoteException e) {
|
master.addColumn(tableName, column);
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
return null;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -980,11 +973,13 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void deleteColumn(final byte [] tableName, final byte [] columnName)
|
public void deleteColumn(final byte [] tableName, final byte [] columnName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
execute(new MasterCallable<Void>() {
|
||||||
getMaster().deleteColumn(tableName, columnName);
|
@Override
|
||||||
} catch (RemoteException e) {
|
public Void call() throws IOException {
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
master.deleteColumn(tableName, columnName);
|
||||||
}
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1000,6 +995,8 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
modifyColumn(Bytes.toBytes(tableName), descriptor);
|
modifyColumn(Bytes.toBytes(tableName), descriptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Modify an existing column family on a table.
|
* Modify an existing column family on a table.
|
||||||
* Asynchronous operation.
|
* Asynchronous operation.
|
||||||
|
@ -1008,16 +1005,15 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @param descriptor new column descriptor to use
|
* @param descriptor new column descriptor to use
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
public void modifyColumn(final byte [] tableName, HColumnDescriptor descriptor)
|
public void modifyColumn(final byte [] tableName, final HColumnDescriptor descriptor)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
execute(new MasterCallable<Void>() {
|
||||||
getMaster().modifyColumn(tableName, descriptor);
|
@Override
|
||||||
} catch (RemoteException re) {
|
public Void call() throws IOException {
|
||||||
// Convert RE exceptions in here; client shouldn't have to deal with them,
|
master.modifyColumn(tableName, descriptor);
|
||||||
// at least w/ the type of exceptions that come out of this method:
|
return null;
|
||||||
// TableNotFoundException, etc.
|
}
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(re);
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1311,7 +1307,12 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void move(final byte [] encodedRegionName, final byte [] destServerName)
|
public void move(final byte [] encodedRegionName, final byte [] destServerName)
|
||||||
throws UnknownRegionException, MasterNotRunningException, ZooKeeperConnectionException {
|
throws UnknownRegionException, MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
getMaster().move(encodedRegionName, destServerName);
|
MasterKeepAliveConnection master = connection.getKeepAliveMaster();
|
||||||
|
try {
|
||||||
|
master.move(encodedRegionName, destServerName);
|
||||||
|
} finally {
|
||||||
|
master.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1323,7 +1324,13 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void assign(final byte[] regionName) throws MasterNotRunningException,
|
public void assign(final byte[] regionName) throws MasterNotRunningException,
|
||||||
ZooKeeperConnectionException, IOException {
|
ZooKeeperConnectionException, IOException {
|
||||||
getMaster().assign(regionName);
|
execute(new MasterCallable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException {
|
||||||
|
master.assign(regionName);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1342,7 +1349,13 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void unassign(final byte [] regionName, final boolean force)
|
public void unassign(final byte [] regionName, final boolean force)
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
|
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
|
||||||
getMaster().unassign(regionName, force);
|
execute(new MasterCallable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException {
|
||||||
|
master.unassign(regionName, force);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1352,7 +1365,12 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public boolean balanceSwitch(final boolean b)
|
public boolean balanceSwitch(final boolean b)
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
return getMaster().balanceSwitch(b);
|
MasterKeepAliveConnection master = connection.getKeepAliveMaster();
|
||||||
|
try {
|
||||||
|
return master.balanceSwitch(b);
|
||||||
|
} finally {
|
||||||
|
master.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1363,7 +1381,12 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public boolean balancer()
|
public boolean balancer()
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
return getMaster().balance();
|
MasterKeepAliveConnection master = connection.getKeepAliveMaster();
|
||||||
|
try {
|
||||||
|
return master.balance();
|
||||||
|
} finally {
|
||||||
|
master.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1458,24 +1481,23 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @param htd modified description of the table
|
* @param htd modified description of the table
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
public void modifyTable(final byte [] tableName, HTableDescriptor htd)
|
public void modifyTable(final byte [] tableName, final HTableDescriptor htd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
execute(new MasterCallable<Void>() {
|
||||||
getMaster().modifyTable(tableName, htd);
|
@Override
|
||||||
} catch (RemoteException re) {
|
public Void call() throws IOException {
|
||||||
// Convert RE exceptions in here; client shouldn't have to deal with them,
|
master.modifyTable(tableName, htd);
|
||||||
// at least w/ the type of exceptions that come out of this method:
|
return null;
|
||||||
// TableNotFoundException, etc.
|
}
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(re);
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param tableNameOrRegionName Name of a table or name of a region.
|
* @param tableNameOrRegionName Name of a table or name of a region.
|
||||||
* @param ct A {@link #CatalogTracker} instance (caller of this method usually has one).
|
* @param ct A {@link CatalogTracker} instance (caller of this method usually has one).
|
||||||
* @return True if <code>tableNameOrRegionName</code> is a verified region
|
* @return True if <code>tableNameOrRegionName</code> is a verified region
|
||||||
* name (we call {@link #MetaReader.getRegion(CatalogTracker catalogTracker,
|
* name (we call {@link MetaReader#getRegion( CatalogTracker, byte[])}
|
||||||
* byte [] regionName)};) else false.
|
* else false.
|
||||||
* Throw an exception if <code>tableNameOrRegionName</code> is null.
|
* Throw an exception if <code>tableNameOrRegionName</code> is null.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@ -1492,7 +1514,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* Convert the table name byte array into a table name string and check if table
|
* Convert the table name byte array into a table name string and check if table
|
||||||
* exists or not.
|
* exists or not.
|
||||||
* @param tableNameBytes Name of a table.
|
* @param tableNameBytes Name of a table.
|
||||||
* @param ct A {@link #CatalogTracker} instance (caller of this method usually has one).
|
* @param ct A {@link CatalogTracker} instance (caller of this method usually has one).
|
||||||
* @return tableName in string form.
|
* @return tableName in string form.
|
||||||
* @throws IOException if a remote or network exception occurs.
|
* @throws IOException if a remote or network exception occurs.
|
||||||
* @throws TableNotFoundException if table does not exist.
|
* @throws TableNotFoundException if table does not exist.
|
||||||
|
@ -1511,12 +1533,13 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
public synchronized void shutdown() throws IOException {
|
public synchronized void shutdown() throws IOException {
|
||||||
isMasterRunning();
|
execute(new MasterCallable<Void>() {
|
||||||
try {
|
@Override
|
||||||
getMaster().shutdown();
|
public Void call() throws IOException {
|
||||||
} catch (RemoteException e) {
|
master.shutdown();
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
return null;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1526,12 +1549,13 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
public synchronized void stopMaster() throws IOException {
|
public synchronized void stopMaster() throws IOException {
|
||||||
isMasterRunning();
|
execute(new MasterCallable<Void>() {
|
||||||
try {
|
@Override
|
||||||
getMaster().stopMaster();
|
public Void call() throws IOException {
|
||||||
} catch (RemoteException e) {
|
master.stopMaster();
|
||||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
return null;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1554,7 +1578,12 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
public ClusterStatus getClusterStatus() throws IOException {
|
public ClusterStatus getClusterStatus() throws IOException {
|
||||||
return getMaster().getClusterStatus();
|
return execute(new MasterCallable<ClusterStatus>() {
|
||||||
|
@Override
|
||||||
|
public ClusterStatus call() {
|
||||||
|
return master.getClusterStatus();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private HRegionLocation getFirstMetaServerForTable(final byte [] tableName)
|
private HRegionLocation getFirstMetaServerForTable(final byte [] tableName)
|
||||||
|
@ -1572,20 +1601,60 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check to see if HBase is running. Throw an exception if not.
|
* Check to see if HBase is running. Throw an exception if not.
|
||||||
|
* We consider that HBase is running if ZooKeeper and Master are running.
|
||||||
*
|
*
|
||||||
* @param conf system configuration
|
* @param conf system configuration
|
||||||
* @throws MasterNotRunningException if the master is not running
|
* @throws MasterNotRunningException if the master is not running
|
||||||
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
||||||
*/
|
*/
|
||||||
public static void checkHBaseAvailable(Configuration conf)
|
public static void checkHBaseAvailable(Configuration conf)
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
Configuration copyOfConf = HBaseConfiguration.create(conf);
|
Configuration copyOfConf = HBaseConfiguration.create(conf);
|
||||||
|
|
||||||
|
// We set it to make it fail as soon as possible if HBase is not available
|
||||||
copyOfConf.setInt("hbase.client.retries.number", 1);
|
copyOfConf.setInt("hbase.client.retries.number", 1);
|
||||||
HBaseAdmin admin = new HBaseAdmin(copyOfConf);
|
copyOfConf.setInt("zookeeper.recovery.retry", 0);
|
||||||
|
|
||||||
|
HConnectionManager.HConnectionImplementation connection
|
||||||
|
= (HConnectionManager.HConnectionImplementation)
|
||||||
|
HConnectionManager.getConnection(copyOfConf);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
admin.close();
|
// Check ZK first.
|
||||||
} catch (IOException ioe) {
|
// If the connection exists, we may have a connection to ZK that does
|
||||||
admin.LOG.info("Failed to close connection", ioe);
|
// not work anymore
|
||||||
|
ZooKeeperKeepAliveConnection zkw = null;
|
||||||
|
try {
|
||||||
|
zkw = connection.getKeepAliveZooKeeperWatcher();
|
||||||
|
zkw.getRecoverableZooKeeper().getZooKeeper().exists(
|
||||||
|
zkw.baseZNode, false);
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
|
||||||
|
} finally {
|
||||||
|
if (zkw != null) {
|
||||||
|
zkw.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check Master, same logic.
|
||||||
|
MasterKeepAliveConnection master = null;
|
||||||
|
try {
|
||||||
|
master = connection.getKeepAliveMaster();
|
||||||
|
master.isMasterRunning();
|
||||||
|
} finally {
|
||||||
|
if (master != null) {
|
||||||
|
master.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1608,6 +1677,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
return Regions;
|
return Regions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (this.connection != null) {
|
if (this.connection != null) {
|
||||||
this.connection.close();
|
this.connection.close();
|
||||||
|
@ -1654,4 +1724,32 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see {@link #execute}
|
||||||
|
*/
|
||||||
|
private abstract static class MasterCallable<V> implements Callable<V>{
|
||||||
|
protected MasterKeepAliveConnection master;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method allows to execute a function requiring a connection to
|
||||||
|
* master without having to manage the connection creation/close.
|
||||||
|
* Create a {@link MasterCallable} to use it.
|
||||||
|
*/
|
||||||
|
private <V> V execute(MasterCallable<V> function) throws IOException {
|
||||||
|
function.master = connection.getKeepAliveMaster();
|
||||||
|
try {
|
||||||
|
return function.call();
|
||||||
|
} catch (RemoteException re) {
|
||||||
|
throw re.unwrapRemoteException();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw e;
|
||||||
|
} catch (Exception e) {
|
||||||
|
// This should not happen...
|
||||||
|
throw new IOException("Unexpected exception when calling master", e);
|
||||||
|
} finally {
|
||||||
|
function.master.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,16 +73,24 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
* @deprecated Removed because it was a mistake exposing zookeeper in this
|
* @deprecated Removed because it was a mistake exposing zookeeper in this
|
||||||
* interface (ZooKeeper is an implementation detail).
|
* interface (ZooKeeper is an implementation detail).
|
||||||
|
* Deprecated in HBase 0.94
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public ZooKeeperWatcher getZooKeeperWatcher() throws IOException;
|
public ZooKeeperWatcher getZooKeeperWatcher() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return proxy connection to master server for this instance
|
* @return proxy connection to master server for this instance
|
||||||
* @throws MasterNotRunningException if the master is not running
|
* @throws MasterNotRunningException if the master is not running
|
||||||
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
|
||||||
|
* @deprecated Removed because it was a mistake exposing master in this
|
||||||
|
* interface (master is an implementation detail). Master functions are
|
||||||
|
* available from HConnection or HBaseAdmin, without having to use
|
||||||
|
* directly the master.
|
||||||
|
* Deprecated in HBase 0.94
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public HMasterInterface getMaster()
|
public HMasterInterface getMaster()
|
||||||
throws MasterNotRunningException, ZooKeeperConnectionException;
|
throws MasterNotRunningException, ZooKeeperConnectionException;
|
||||||
|
|
||||||
/** @return - true if the master server is running */
|
/** @return - true if the master server is running */
|
||||||
public boolean isMasterRunning()
|
public boolean isMasterRunning()
|
||||||
|
@ -197,6 +205,7 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
* @deprecated Use {@link #getHRegionConnection(String, int)}
|
* @deprecated Use {@link #getHRegionConnection(String, int)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public HRegionInterface getHRegionConnection(HServerAddress regionServer)
|
public HRegionInterface getHRegionConnection(HServerAddress regionServer)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -217,8 +226,9 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
* @param getMaster - do we check if master is alive
|
* @param getMaster - do we check if master is alive
|
||||||
* @return proxy for HRegionServer
|
* @return proxy for HRegionServer
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
* @deprecated Use {@link #getHRegionConnection(HServerAddress, boolean)}
|
* @deprecated Use {@link #getHRegionConnection(String, int)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public HRegionInterface getHRegionConnection(HServerAddress regionServer,
|
public HRegionInterface getHRegionConnection(HServerAddress regionServer,
|
||||||
boolean getMaster)
|
boolean getMaster)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
@ -259,6 +269,7 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
* @throws RuntimeException other unspecified error
|
* @throws RuntimeException other unspecified error
|
||||||
* @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
|
* @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
|
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
|
||||||
throws IOException, RuntimeException;
|
throws IOException, RuntimeException;
|
||||||
|
|
||||||
|
@ -272,6 +283,7 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
* @throws RuntimeException other unspecified error
|
* @throws RuntimeException other unspecified error
|
||||||
* @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
|
* @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
||||||
throws IOException, RuntimeException;
|
throws IOException, RuntimeException;
|
||||||
|
|
||||||
|
@ -365,6 +377,7 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
* @deprecated This method will be changed from public to package protected.
|
* @deprecated This method will be changed from public to package protected.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public int getCurrentNrHRS() throws IOException;
|
public int getCurrentNrHRS() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -386,3 +399,4 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void clearCaches(final String sn);
|
public void clearCaches(final String sn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -269,7 +269,7 @@ public class HTable implements HTableInterface {
|
||||||
* @param tableName Name of table to check.
|
* @param tableName Name of table to check.
|
||||||
* @return {@code true} if table is online.
|
* @return {@code true} if table is online.
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
|
* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static boolean isTableEnabled(Configuration conf, String tableName)
|
public static boolean isTableEnabled(Configuration conf, String tableName)
|
||||||
|
@ -283,7 +283,9 @@ public class HTable implements HTableInterface {
|
||||||
* @param tableName Name of table to check.
|
* @param tableName Name of table to check.
|
||||||
* @return {@code true} if table is online.
|
* @return {@code true} if table is online.
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[] tableName)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public static boolean isTableEnabled(Configuration conf,
|
public static boolean isTableEnabled(Configuration conf,
|
||||||
final byte[] tableName) throws IOException {
|
final byte[] tableName) throws IOException {
|
||||||
return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
|
return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
|
||||||
|
@ -345,6 +347,7 @@ public class HTable implements HTableInterface {
|
||||||
* @deprecated This method will be changed from public to package protected.
|
* @deprecated This method will be changed from public to package protected.
|
||||||
*/
|
*/
|
||||||
// TODO(tsuna): Remove this. Unit tests shouldn't require public helpers.
|
// TODO(tsuna): Remove this. Unit tests shouldn't require public helpers.
|
||||||
|
@Deprecated
|
||||||
public HConnection getConnection() {
|
public HConnection getConnection() {
|
||||||
return this.connection;
|
return this.connection;
|
||||||
}
|
}
|
||||||
|
@ -355,6 +358,7 @@ public class HTable implements HTableInterface {
|
||||||
* The default value comes from {@code hbase.client.scanner.caching}.
|
* The default value comes from {@code hbase.client.scanner.caching}.
|
||||||
* @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
|
* @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public int getScannerCaching() {
|
public int getScannerCaching() {
|
||||||
return scannerCaching;
|
return scannerCaching;
|
||||||
}
|
}
|
||||||
|
@ -370,6 +374,7 @@ public class HTable implements HTableInterface {
|
||||||
* @param scannerCaching the number of rows a scanner will fetch at once.
|
* @param scannerCaching the number of rows a scanner will fetch at once.
|
||||||
* @deprecated Use {@link Scan#setCaching(int)}
|
* @deprecated Use {@link Scan#setCaching(int)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void setScannerCaching(int scannerCaching) {
|
public void setScannerCaching(int scannerCaching) {
|
||||||
this.scannerCaching = scannerCaching;
|
this.scannerCaching = scannerCaching;
|
||||||
}
|
}
|
||||||
|
@ -447,6 +452,7 @@ public class HTable implements HTableInterface {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
* @deprecated Use {@link #getRegionLocations()} or {@link #getStartEndKeys()}
|
* @deprecated Use {@link #getRegionLocations()} or {@link #getStartEndKeys()}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
|
public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
|
||||||
final Map<HRegionInfo, HServerAddress> regionMap =
|
final Map<HRegionInfo, HServerAddress> regionMap =
|
||||||
new TreeMap<HRegionInfo, HServerAddress>();
|
new TreeMap<HRegionInfo, HServerAddress>();
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2012 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A KeepAlive connection is not physically closed immediately after the close,
|
||||||
|
* but rather kept alive for a few minutes. It makes sense only if it's shared.
|
||||||
|
*
|
||||||
|
* This interface is used by a dynamic proxy. It allows to have a #close
|
||||||
|
* function in a master client.
|
||||||
|
*
|
||||||
|
* This class is intended to be used internally by HBase classes; but not by
|
||||||
|
* final user code. Hence it's package protected.
|
||||||
|
*/
|
||||||
|
interface MasterKeepAliveConnection extends HMasterInterface, Closeable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close();
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2012 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We inherit the current ZooKeeperWatcher implementation to change the semantic
|
||||||
|
* of the close: the new close won't immediately close the connection but
|
||||||
|
* will have a keep alive. See {@link HConnection}.
|
||||||
|
* This allows to make it available with a consistent interface. The whole
|
||||||
|
* ZooKeeperWatcher use in HConnection will be then changed to remove the
|
||||||
|
* watcher part.
|
||||||
|
*
|
||||||
|
* This class is intended to be used internally by HBase classes; but not by
|
||||||
|
* final user code. Hence it's package protected.
|
||||||
|
*/
|
||||||
|
class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{
|
||||||
|
ZooKeeperKeepAliveConnection(
|
||||||
|
Configuration conf, String descriptor,
|
||||||
|
HConnectionManager.HConnectionImplementation conn) throws IOException {
|
||||||
|
super(conf, descriptor, conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
((HConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void internalClose(){
|
||||||
|
super.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,9 +24,7 @@ import java.io.IOException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
||||||
|
@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
|
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -111,16 +110,34 @@ public class VerifyReplication {
|
||||||
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
||||||
@Override
|
@Override
|
||||||
public Void connect(HConnection conn) throws IOException {
|
public Void connect(HConnection conn) throws IOException {
|
||||||
|
ZooKeeperWatcher localZKW = null;
|
||||||
|
ReplicationZookeeper zk = null;
|
||||||
|
ReplicationPeer peer = null;
|
||||||
try {
|
try {
|
||||||
ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
|
localZKW = new ZooKeeperWatcher(
|
||||||
conn.getZooKeeperWatcher());
|
conf, "VerifyReplication", new Abortable() {
|
||||||
ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
|
@Override public void abort(String why, Throwable e) {}
|
||||||
|
@Override public boolean isAborted() {return false;}
|
||||||
|
});
|
||||||
|
zk = new ReplicationZookeeper(conn, conf, localZKW);
|
||||||
|
// Just verifying it we can connect
|
||||||
|
peer = zk.getPeer(peerId);
|
||||||
HTable replicatedTable = new HTable(peer.getConfiguration(),
|
HTable replicatedTable = new HTable(peer.getConfiguration(),
|
||||||
conf.get(NAME+".tableName"));
|
conf.get(NAME+".tableName"));
|
||||||
scan.setStartRow(value.getRow());
|
scan.setStartRow(value.getRow());
|
||||||
replicatedScanner = replicatedTable.getScanner(scan);
|
replicatedScanner = replicatedTable.getScanner(scan);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new IOException("Got a ZK exception", e);
|
throw new IOException("Got a ZK exception", e);
|
||||||
|
} finally {
|
||||||
|
if (peer != null) {
|
||||||
|
peer.close();
|
||||||
|
}
|
||||||
|
if (zk != null) {
|
||||||
|
zk.close();
|
||||||
|
}
|
||||||
|
if (localZKW != null) {
|
||||||
|
localZKW.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -160,11 +177,18 @@ public class VerifyReplication {
|
||||||
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
||||||
@Override
|
@Override
|
||||||
public Void connect(HConnection conn) throws IOException {
|
public Void connect(HConnection conn) throws IOException {
|
||||||
|
ZooKeeperWatcher localZKW = null;
|
||||||
|
ReplicationZookeeper zk = null;
|
||||||
|
ReplicationPeer peer = null;
|
||||||
try {
|
try {
|
||||||
ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
|
localZKW = new ZooKeeperWatcher(
|
||||||
conn.getZooKeeperWatcher());
|
conf, "VerifyReplication", new Abortable() {
|
||||||
|
@Override public void abort(String why, Throwable e) {}
|
||||||
|
@Override public boolean isAborted() {return false;}
|
||||||
|
});
|
||||||
|
zk = new ReplicationZookeeper(conn, conf, localZKW);
|
||||||
// Just verifying it we can connect
|
// Just verifying it we can connect
|
||||||
ReplicationPeer peer = zk.getPeer(peerId);
|
peer = zk.getPeer(peerId);
|
||||||
if (peer == null) {
|
if (peer == null) {
|
||||||
throw new IOException("Couldn't get access to the slave cluster," +
|
throw new IOException("Couldn't get access to the slave cluster," +
|
||||||
"please see the log");
|
"please see the log");
|
||||||
|
@ -172,6 +196,16 @@ public class VerifyReplication {
|
||||||
} catch (KeeperException ex) {
|
} catch (KeeperException ex) {
|
||||||
throw new IOException("Couldn't get access to the slave cluster" +
|
throw new IOException("Couldn't get access to the slave cluster" +
|
||||||
" because: ", ex);
|
" because: ", ex);
|
||||||
|
} finally {
|
||||||
|
if (peer != null){
|
||||||
|
peer.close();
|
||||||
|
}
|
||||||
|
if (zk != null){
|
||||||
|
zk.close();
|
||||||
|
}
|
||||||
|
if (localZKW != null){
|
||||||
|
localZKW.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -38,7 +39,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
* sessions and re-establishing the ZK connections.
|
* sessions and re-establishing the ZK connections.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationPeer implements Abortable {
|
public class ReplicationPeer implements Abortable, Closeable {
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
|
private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
|
||||||
|
|
||||||
private final String clusterKey;
|
private final String clusterKey;
|
||||||
|
@ -135,7 +136,7 @@ public class ReplicationPeer implements Abortable {
|
||||||
public void reloadZkWatcher() throws IOException {
|
public void reloadZkWatcher() throws IOException {
|
||||||
if (zkw != null) zkw.close();
|
if (zkw != null) zkw.close();
|
||||||
zkw = new ZooKeeperWatcher(conf,
|
zkw = new ZooKeeperWatcher(conf,
|
||||||
"connection to cluster: " + id, this);
|
"connection to cluster: " + id, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -144,4 +145,11 @@ public class ReplicationPeer implements Abortable {
|
||||||
// abort method is called.
|
// abort method is called.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (zkw != null){
|
||||||
|
zkw.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -76,7 +77,7 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationZookeeper {
|
public class ReplicationZookeeper implements Closeable{
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(ReplicationZookeeper.class);
|
LogFactory.getLog(ReplicationZookeeper.class);
|
||||||
// Name of znode we use to lock when failover
|
// Name of znode we use to lock when failover
|
||||||
|
@ -746,6 +747,12 @@ public class ReplicationZookeeper {
|
||||||
return peersZNode;
|
return peersZNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (statusTracker != null)
|
||||||
|
statusTracker.stop();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tracker for status of the replication
|
* Tracker for status of the replication
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -58,22 +58,15 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.*;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
||||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
||||||
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
|
import org.apache.hadoop.hbase.zookeeper.*;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKTable;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
@ -137,10 +130,9 @@ public class HBaseFsck {
|
||||||
executor.allowCoreThreadTimeOut(true);
|
executor.allowCoreThreadTimeOut(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void connect() throws MasterNotRunningException,
|
public void connect() throws IOException {
|
||||||
ZooKeeperConnectionException {
|
|
||||||
admin = new HBaseAdmin(conf);
|
admin = new HBaseAdmin(conf);
|
||||||
status = admin.getMaster().getClusterStatus();
|
status = admin.getClusterStatus();
|
||||||
connection = admin.getConnection();
|
connection = admin.getConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class Merge extends Configured implements Tool {
|
||||||
LOG.info("Verifying that HBase is not running...");
|
LOG.info("Verifying that HBase is not running...");
|
||||||
try {
|
try {
|
||||||
HBaseAdmin.checkHBaseAvailable(getConf());
|
HBaseAdmin.checkHBaseAvailable(getConf());
|
||||||
LOG.fatal("HBase cluster must be off-line.");
|
LOG.fatal("HBase cluster must be off-line, and is not. Aborting.");
|
||||||
return -1;
|
return -1;
|
||||||
} catch (ZooKeeperConnectionException zkce) {
|
} catch (ZooKeeperConnectionException zkce) {
|
||||||
// If no zk, presume no master.
|
// If no zk, presume no master.
|
||||||
|
|
|
@ -296,7 +296,7 @@ public class RecoverableZooKeeper {
|
||||||
RetryCounter retryCounter = retryCounterFactory.create();
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
byte[] revData = zk.getData(path, watcher, stat);
|
byte[] revData = zk.getData(path, watcher, stat);
|
||||||
return this.removeMetaData(revData);
|
return this.removeMetaData(revData);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
switch (e.code()) {
|
switch (e.code()) {
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
|
||||||
* @return Returns null if <code>data</code> is null else converts passed data
|
* @return Returns null if <code>data</code> is null else converts passed data
|
||||||
* to a ServerName instance.
|
* to a ServerName instance.
|
||||||
*/
|
*/
|
||||||
private static ServerName dataToServerName(final byte [] data) {
|
public static ServerName dataToServerName(final byte [] data) {
|
||||||
// The str returned could be old style -- pre hbase-1502 -- which was
|
// The str returned could be old style -- pre hbase-1502 -- which was
|
||||||
// hostname and port seperated by a colon rather than hostname, port and
|
// hostname and port seperated by a colon rather than hostname, port and
|
||||||
// startcode delimited by a ','.
|
// startcode delimited by a ','.
|
||||||
|
|
|
@ -35,9 +35,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.EmptyWatcher;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
@ -253,9 +251,6 @@ public class ZKUtil {
|
||||||
/**
|
/**
|
||||||
* Check if the specified node exists. Sets no watches.
|
* Check if the specified node exists. Sets no watches.
|
||||||
*
|
*
|
||||||
* Returns true if node exists, false if not. Returns an exception if there
|
|
||||||
* is an unexpected zookeeper exception.
|
|
||||||
*
|
|
||||||
* @param zkw zk reference
|
* @param zkw zk reference
|
||||||
* @param znode path of node to watch
|
* @param znode path of node to watch
|
||||||
* @return version of the node if it exists, -1 if does not exist
|
* @return version of the node if it exists, -1 if does not exist
|
||||||
|
@ -463,7 +458,8 @@ public class ZKUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get znode data. Does not set a watcher.
|
* Get znode data. Does not set a watcher.
|
||||||
* @return ZNode data
|
* @return ZNode data, null if the node does not exist or if there is an
|
||||||
|
* error.
|
||||||
*/
|
*/
|
||||||
public static byte [] getData(ZooKeeperWatcher zkw, String znode)
|
public static byte [] getData(ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
|
@ -1172,4 +1168,21 @@ public class ZKUtil {
|
||||||
throw new IOException(keeperEx);
|
throw new IOException(keeperEx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static byte[] blockUntilAvailable(
|
||||||
|
final ZooKeeperWatcher zkw, final String znode, final long timeout)
|
||||||
|
throws InterruptedException {
|
||||||
|
if (timeout < 0) throw new IllegalArgumentException();
|
||||||
|
if (zkw == null) throw new IllegalArgumentException();
|
||||||
|
if (znode == null) throw new IllegalArgumentException();
|
||||||
|
|
||||||
|
ZooKeeperNodeTracker znt = new ZooKeeperNodeTracker(zkw, znode, new Abortable() {
|
||||||
|
@Override public void abort(String why, Throwable e) {}
|
||||||
|
@Override public boolean isAborted() {return false;}
|
||||||
|
}) {
|
||||||
|
};
|
||||||
|
|
||||||
|
return znt.blockUntilAvailable(timeout, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,12 +125,38 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
||||||
long remaining = timeout;
|
long remaining = timeout;
|
||||||
if (refresh) {
|
if (refresh) {
|
||||||
try {
|
try {
|
||||||
|
// This does not create a watch if the node does not exists
|
||||||
this.data = ZKUtil.getDataAndWatch(watcher, node);
|
this.data = ZKUtil.getDataAndWatch(watcher, node);
|
||||||
} catch(KeeperException e) {
|
} catch(KeeperException e) {
|
||||||
|
// We use to abort here, but in some cases the abort is ignored (
|
||||||
|
// (empty Abortable), so it's better to log...
|
||||||
|
LOG.warn("Unexpected exception handling blockUntilAvailable", e);
|
||||||
abortable.abort("Unexpected exception handling blockUntilAvailable", e);
|
abortable.abort("Unexpected exception handling blockUntilAvailable", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
boolean nodeExistsChecked = (!refresh ||data!=null);
|
||||||
while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
|
while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
|
||||||
|
if (!nodeExistsChecked) {
|
||||||
|
try {
|
||||||
|
nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Got exception while trying to check existence in ZooKeeper" +
|
||||||
|
" of the node: "+node+", retrying if timeout not reached",e );
|
||||||
|
}
|
||||||
|
|
||||||
|
// It did not exists, and now it does.
|
||||||
|
if (nodeExistsChecked){
|
||||||
|
LOG.info("Node "+node+" now exists, resetting a watcher");
|
||||||
|
try {
|
||||||
|
// This does not create a watch if the node does not exists
|
||||||
|
this.data = ZKUtil.getDataAndWatch(watcher, node);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn("Unexpected exception handling blockUntilAvailable", e);
|
||||||
|
abortable.abort("Unexpected exception handling blockUntilAvailable", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// We expect a notification; but we wait with a
|
// We expect a notification; but we wait with a
|
||||||
// a timeout to lower the impact of a race condition if any
|
// a timeout to lower the impact of a race condition if any
|
||||||
wait(100);
|
wait(100);
|
||||||
|
@ -215,7 +241,8 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
abortable
|
abortable
|
||||||
.abort(
|
.abort(
|
||||||
"Exception while checking if basenode exists.",
|
"Exception while checking if basenode ("+watcher.baseZNode+
|
||||||
|
") exists in ZooKeeper.",
|
||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.zookeeper;
|
package org.apache.hadoop.hbase.zookeeper;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -55,7 +56,7 @@ import org.apache.zookeeper.data.ACL;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class ZooKeeperWatcher implements Watcher, Abortable {
|
public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
||||||
private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
|
private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
|
||||||
|
|
||||||
// Identifier for this watcher (for logging only). It is made of the prefix
|
// Identifier for this watcher (for logging only). It is made of the prefix
|
||||||
|
@ -69,7 +70,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
private RecoverableZooKeeper recoverableZooKeeper;
|
private RecoverableZooKeeper recoverableZooKeeper;
|
||||||
|
|
||||||
// abortable in case of zk failure
|
// abortable in case of zk failure
|
||||||
private Abortable abortable;
|
protected Abortable abortable;
|
||||||
|
|
||||||
// listeners to be notified
|
// listeners to be notified
|
||||||
private final List<ZooKeeperListener> listeners =
|
private final List<ZooKeeperListener> listeners =
|
||||||
|
@ -438,15 +439,16 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the connection to ZooKeeper.
|
* Close the connection to ZooKeeper.
|
||||||
|
*
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
if (recoverableZooKeeper != null) {
|
if (recoverableZooKeeper != null) {
|
||||||
recoverableZooKeeper.close();
|
recoverableZooKeeper.close();
|
||||||
// super.close();
|
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1321,28 +1321,38 @@ public class HBaseTestingUtility {
|
||||||
expireSession(rs.getZooKeeper(), rs);
|
expireSession(rs.getZooKeeper(), rs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void expireSession(ZooKeeperWatcher nodeZK, Server server)
|
|
||||||
throws Exception {
|
|
||||||
expireSession(nodeZK, server, false);
|
public void expireSession(ZooKeeperWatcher nodeZK) throws Exception {
|
||||||
|
expireSession(nodeZK, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void expireSession(ZooKeeperWatcher nodeZK, Server server,
|
@Deprecated
|
||||||
boolean checkStatus) throws Exception {
|
public void expireSession(ZooKeeperWatcher nodeZK, Server server)
|
||||||
|
throws Exception {
|
||||||
|
expireSession(nodeZK, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Expire a ZooKeeer session as recommended in ZooKeeper documentation
|
||||||
|
* http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
|
||||||
|
*/
|
||||||
|
public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
|
||||||
|
throws Exception {
|
||||||
Configuration c = new Configuration(this.conf);
|
Configuration c = new Configuration(this.conf);
|
||||||
String quorumServers = ZKConfig.getZKQuorumServersString(c);
|
String quorumServers = ZKConfig.getZKQuorumServersString(c);
|
||||||
int sessionTimeout = 500;
|
|
||||||
ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
|
ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
|
||||||
byte[] password = zk.getSessionPasswd();
|
byte[] password = zk.getSessionPasswd();
|
||||||
long sessionID = zk.getSessionId();
|
long sessionID = zk.getSessionId();
|
||||||
|
|
||||||
ZooKeeper newZK = new ZooKeeper(quorumServers,
|
ZooKeeper newZK = new ZooKeeper(quorumServers,
|
||||||
sessionTimeout, EmptyWatcher.instance, sessionID, password);
|
1000, EmptyWatcher.instance, sessionID, password);
|
||||||
newZK.close();
|
newZK.close();
|
||||||
final long sleep = 7000; // 7s seems enough to manage the timeout
|
LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
|
||||||
LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID) +
|
|
||||||
"; sleeping=" + sleep);
|
|
||||||
|
|
||||||
Thread.sleep(sleep);
|
// There is actually no reason to sleep here. Session is expired.
|
||||||
|
// May be for old ZK versions?
|
||||||
|
// Thread.sleep(sleep);
|
||||||
|
|
||||||
if (checkStatus) {
|
if (checkStatus) {
|
||||||
new HTable(new Configuration(conf), HConstants.META_TABLE_NAME).close();
|
new HTable(new Configuration(conf), HConstants.META_TABLE_NAME).close();
|
||||||
|
@ -1361,7 +1371,7 @@ public class HBaseTestingUtility {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a HBaseAdmin instance.
|
* Returns a HBaseAdmin instance.
|
||||||
* This instance is shared between HBaseTestingUtility intance users.
|
* This instance is shared between HBaseTestingUtility instance users.
|
||||||
* Don't close it, it will be closed automatically when the
|
* Don't close it, it will be closed automatically when the
|
||||||
* cluster shutdowns
|
* cluster shutdowns
|
||||||
*
|
*
|
||||||
|
@ -1371,7 +1381,7 @@ public class HBaseTestingUtility {
|
||||||
public synchronized HBaseAdmin getHBaseAdmin()
|
public synchronized HBaseAdmin getHBaseAdmin()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (hbaseAdmin == null){
|
if (hbaseAdmin == null){
|
||||||
hbaseAdmin = new HBaseAdmin(new Configuration(getConfiguration()));
|
hbaseAdmin = new HBaseAdmin(getConfiguration());
|
||||||
}
|
}
|
||||||
return hbaseAdmin;
|
return hbaseAdmin;
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,41 +92,41 @@ public class TestZooKeeper {
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testClientSessionExpired()
|
public void testClientSessionExpired() throws Exception {
|
||||||
throws IOException, InterruptedException {
|
|
||||||
LOG.info("testClientSessionExpired");
|
LOG.info("testClientSessionExpired");
|
||||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
new HTable(c, HConstants.META_TABLE_NAME).close();
|
new HTable(c, HConstants.META_TABLE_NAME).close();
|
||||||
String quorumServers = ZKConfig.getZKQuorumServersString(c);
|
|
||||||
int sessionTimeout = 5 * 1000; // 5 seconds
|
|
||||||
HConnection connection = HConnectionManager.getConnection(c);
|
HConnection connection = HConnectionManager.getConnection(c);
|
||||||
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
|
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
|
||||||
long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId();
|
|
||||||
byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd();
|
|
||||||
ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout,
|
|
||||||
EmptyWatcher.instance, sessionID, password);
|
|
||||||
LOG.info("Session timeout=" + zk.getSessionTimeout() +
|
|
||||||
", original=" + sessionTimeout +
|
|
||||||
", id=" + zk.getSessionId());
|
|
||||||
zk.close();
|
|
||||||
|
|
||||||
Thread.sleep(sessionTimeout * 3L);
|
TEST_UTIL.expireSession(connectionZK);
|
||||||
|
|
||||||
|
// Depending on how long you wait here, the state after dump will
|
||||||
|
// be 'closed' or 'Connecting'.
|
||||||
|
// There should be no reason to wait, the connection is closed on the server
|
||||||
|
// Thread.sleep(sessionTimeout * 3L);
|
||||||
|
|
||||||
|
LOG.info("Before dump state=" +
|
||||||
|
connectionZK.getRecoverableZooKeeper().getState());
|
||||||
// provoke session expiration by doing something with ZK
|
// provoke session expiration by doing something with ZK
|
||||||
ZKUtil.dump(connectionZK);
|
ZKUtil.dump(connectionZK);
|
||||||
|
|
||||||
// Check that the old ZK connection is closed, means we did expire
|
// Check that the old ZK connection is closed, means we did expire
|
||||||
System.err.println("ZooKeeper should have timed out");
|
LOG.info("ZooKeeper should have timed out");
|
||||||
String state = connectionZK.getRecoverableZooKeeper().getState().toString();
|
States state = connectionZK.getRecoverableZooKeeper().getState();
|
||||||
LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState());
|
LOG.info("After dump state=" + state);
|
||||||
Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().
|
Assert.assertTrue(state == States.CLOSED);
|
||||||
equals(States.CLOSED));
|
|
||||||
|
|
||||||
// Check that the client recovered
|
// Check that the client recovered
|
||||||
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
|
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
|
||||||
LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState());
|
//Here, if you wait, you will have a CONNECTED state. If you don't,
|
||||||
Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals(
|
// you may have the CONNECTING one.
|
||||||
States.CONNECTED));
|
//Thread.sleep(sessionTimeout * 3L);
|
||||||
|
States state2 = newConnectionZK.getRecoverableZooKeeper().getState();
|
||||||
|
LOG.info("After new get state=" +state2);
|
||||||
|
Assert.assertTrue(
|
||||||
|
state2 == States.CONNECTED || state2 == States.CONNECTING);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -141,6 +141,7 @@ public class TestZooKeeper {
|
||||||
public void testMasterSessionExpired() throws Exception {
|
public void testMasterSessionExpired() throws Exception {
|
||||||
LOG.info("Starting testMasterSessionExpired");
|
LOG.info("Starting testMasterSessionExpired");
|
||||||
TEST_UTIL.expireMasterSession();
|
TEST_UTIL.expireMasterSession();
|
||||||
|
Thread.sleep(7000); // Helps the test to succeed!!!
|
||||||
testSanity();
|
testSanity();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +151,7 @@ public class TestZooKeeper {
|
||||||
*/
|
*/
|
||||||
public void testSanity() throws Exception{
|
public void testSanity() throws Exception{
|
||||||
HBaseAdmin admin =
|
HBaseAdmin admin =
|
||||||
new HBaseAdmin(new Configuration(TEST_UTIL.getConfiguration()));
|
new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||||
String tableName = "test"+System.currentTimeMillis();
|
String tableName = "test"+System.currentTimeMillis();
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
HColumnDescriptor family = new HColumnDescriptor("fam");
|
HColumnDescriptor family = new HColumnDescriptor("fam");
|
||||||
|
|
|
@ -749,6 +749,7 @@ public class TestAdmin {
|
||||||
admin.createTable(desc, splitKeys);
|
admin.createTable(desc, splitKeys);
|
||||||
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
Map<HRegionInfo, HServerAddress> regions = ht.getRegionsInfo();
|
Map<HRegionInfo, HServerAddress> regions = ht.getRegionsInfo();
|
||||||
|
ht.close();
|
||||||
assertEquals("Tried to create " + expectedRegions + " regions "
|
assertEquals("Tried to create " + expectedRegions + " regions "
|
||||||
+ "but only found " + regions.size(), expectedRegions, regions.size());
|
+ "but only found " + regions.size(), expectedRegions, regions.size());
|
||||||
// Disable table.
|
// Disable table.
|
||||||
|
@ -1482,6 +1483,37 @@ public class TestAdmin {
|
||||||
Assert.assertEquals(initialCount, finalCount) ;
|
Assert.assertEquals(initialCount, finalCount) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that we have an exception if the cluster is not there.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCheckHBaseAvailableWithoutCluster() {
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
// Change the ZK address to go to something not used.
|
||||||
|
conf.setInt(
|
||||||
|
"hbase.zookeeper.quorum",
|
||||||
|
conf.getInt("hbase.zookeeper.quorum", 9999)+10);
|
||||||
|
|
||||||
|
int initialCount = HConnectionTestingUtility.getConnectionCount();
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
HBaseAdmin.checkHBaseAvailable(conf);
|
||||||
|
assertTrue(false);
|
||||||
|
} catch (MasterNotRunningException ignored) {
|
||||||
|
} catch (ZooKeeperConnectionException ignored) {
|
||||||
|
}
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
|
||||||
|
int finalCount = HConnectionTestingUtility.getConnectionCount();
|
||||||
|
|
||||||
|
Assert.assertEquals(initialCount, finalCount) ;
|
||||||
|
|
||||||
|
LOG.info("It took "+(end-start)+" ms to find out that" +
|
||||||
|
" HBase was not available");
|
||||||
|
}
|
||||||
|
|
||||||
@org.junit.Rule
|
@org.junit.Rule
|
||||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
|
|
|
@ -19,21 +19,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertSame;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -53,16 +45,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
|
||||||
import org.apache.hadoop.hbase.HServerAddress;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.LargeTests;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
|
import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
@ -86,6 +69,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -95,6 +79,8 @@ import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run tests that use the HBase clients; {@link HTable} and {@link HTablePool}.
|
* Run tests that use the HBase clients; {@link HTable} and {@link HTablePool}.
|
||||||
* Sets up the HBase mini cluster once at start and runs through all client tests.
|
* Sets up the HBase mini cluster once at start and runs through all client tests.
|
||||||
|
@ -214,6 +200,91 @@ public class TestFromClientSide {
|
||||||
h.close();
|
h.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSharedZooKeeper() throws Exception {
|
||||||
|
Configuration newConfig = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
newConfig.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "12345");
|
||||||
|
|
||||||
|
// First with a simple ZKW
|
||||||
|
ZooKeeperWatcher z0 = new ZooKeeperWatcher(
|
||||||
|
newConfig, "hconnection", new Abortable() {
|
||||||
|
@Override public void abort(String why, Throwable e) {}
|
||||||
|
@Override public boolean isAborted() {return false;}
|
||||||
|
});
|
||||||
|
z0.getRecoverableZooKeeper().getZooKeeper().exists("/oldZooKeeperWatcher", false);
|
||||||
|
z0.close();
|
||||||
|
|
||||||
|
// Then a ZooKeeperKeepAliveConnection
|
||||||
|
HConnectionManager.HConnectionImplementation connection1 =
|
||||||
|
(HConnectionManager.HConnectionImplementation)
|
||||||
|
HConnectionManager.getConnection(newConfig);
|
||||||
|
|
||||||
|
ZooKeeperKeepAliveConnection z1 = connection1.getKeepAliveZooKeeperWatcher();
|
||||||
|
z1.getRecoverableZooKeeper().getZooKeeper().exists("/z1", false);
|
||||||
|
|
||||||
|
z1.close();
|
||||||
|
|
||||||
|
// will still work, because the real connection is not closed yet
|
||||||
|
// Not do be done in real code
|
||||||
|
z1.getRecoverableZooKeeper().getZooKeeper().exists("/z1afterclose", false);
|
||||||
|
|
||||||
|
|
||||||
|
ZooKeeperKeepAliveConnection z2 = connection1.getKeepAliveZooKeeperWatcher();
|
||||||
|
assertTrue(
|
||||||
|
"ZooKeeperKeepAliveConnection equals on same connection", z1 == z2);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Configuration newConfig2 = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
newConfig2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "6789");
|
||||||
|
HConnectionManager.HConnectionImplementation connection2 =
|
||||||
|
(HConnectionManager.HConnectionImplementation)
|
||||||
|
HConnectionManager.getConnection(newConfig2);
|
||||||
|
|
||||||
|
assertTrue("connections should be different ", connection1 != connection2);
|
||||||
|
|
||||||
|
ZooKeeperKeepAliveConnection z3 = connection2.getKeepAliveZooKeeperWatcher();
|
||||||
|
assertTrue(
|
||||||
|
"ZooKeeperKeepAliveConnection should be different" +
|
||||||
|
" on different connections", z1 != z3);
|
||||||
|
|
||||||
|
// Bypass the private access
|
||||||
|
Method m = HConnectionManager.HConnectionImplementation.class.
|
||||||
|
getDeclaredMethod("closeZooKeeperWatcher");
|
||||||
|
m.setAccessible(true);
|
||||||
|
m.invoke(connection2);
|
||||||
|
|
||||||
|
ZooKeeperKeepAliveConnection z4 = connection2.getKeepAliveZooKeeperWatcher();
|
||||||
|
assertTrue(
|
||||||
|
"ZooKeeperKeepAliveConnection should be recreated" +
|
||||||
|
" when previous connections was closed"
|
||||||
|
, z3 != z4);
|
||||||
|
|
||||||
|
|
||||||
|
z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false);
|
||||||
|
z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
|
||||||
|
|
||||||
|
|
||||||
|
HConnectionManager.deleteConnection(newConfig, true);
|
||||||
|
try {
|
||||||
|
z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false);
|
||||||
|
assertTrue("We should not have a valid connection for z2", false);
|
||||||
|
} catch (Exception e){
|
||||||
|
}
|
||||||
|
|
||||||
|
z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
|
||||||
|
// We expect success here.
|
||||||
|
|
||||||
|
|
||||||
|
HConnectionManager.deleteConnection(newConfig2, true);
|
||||||
|
try {
|
||||||
|
z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
|
||||||
|
assertTrue("We should not have a valid connection for z4", false);
|
||||||
|
} catch (Exception e){
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HBASE-2468 use case 1 and 2: region info de/serialization
|
* HBASE-2468 use case 1 and 2: region info de/serialization
|
||||||
*/
|
*/
|
||||||
|
@ -4594,7 +4665,7 @@ public class TestFromClientSide {
|
||||||
assertNotNull(addrAfter);
|
assertNotNull(addrAfter);
|
||||||
assertTrue(addrAfter.getPort() != addrCache.getPort());
|
assertTrue(addrAfter.getPort() != addrCache.getPort());
|
||||||
assertEquals(addrAfter.getPort(), addrNoCache.getPort());
|
assertEquals(addrAfter.getPort(), addrNoCache.getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
/**
|
/**
|
||||||
|
@ -4654,8 +4725,9 @@ public class TestFromClientSide {
|
||||||
regionsList = table.getRegionsInRange(startKey, endKey);
|
regionsList = table.getRegionsInRange(startKey, endKey);
|
||||||
assertEquals(1, regionsList.size());
|
assertEquals(1, regionsList.size());
|
||||||
}
|
}
|
||||||
@org.junit.Rule
|
|
||||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
@org.junit.Rule
|
||||||
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.master;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
@Category(SmallTests.class)
|
||||||
public class TestDeadServer {
|
public class TestDeadServer {
|
||||||
@Test public void testIsDead() {
|
@Test public void testIsDead() {
|
||||||
DeadServer ds = new DeadServer();
|
DeadServer ds = new DeadServer();
|
||||||
|
|
|
@ -45,10 +45,10 @@ import org.junit.experimental.categories.Category;
|
||||||
* Tests the restarting of everything as done during rolling restarts.
|
* Tests the restarting of everything as done during rolling restarts.
|
||||||
*/
|
*/
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestRollingRestart {
|
public class TestRollingRestart {
|
||||||
private static final Log LOG = LogFactory.getLog(TestRollingRestart.class);
|
private static final Log LOG = LogFactory.getLog(TestRollingRestart.class);
|
||||||
|
|
||||||
@Test (timeout=300000)
|
@Test (timeout=500000)
|
||||||
public void testBasicRollingRestart() throws Exception {
|
public void testBasicRollingRestart() throws Exception {
|
||||||
|
|
||||||
// Start a cluster with 2 masters and 4 regionservers
|
// Start a cluster with 2 masters and 4 regionservers
|
||||||
|
@ -181,7 +181,7 @@ public class TestRollingRestart {
|
||||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
Thread.sleep(2000);
|
Thread.sleep(1000);
|
||||||
assertRegionsAssigned(cluster, regions);
|
assertRegionsAssigned(cluster, regions);
|
||||||
|
|
||||||
// Bring the RS hosting ROOT down and the RS hosting META down at once
|
// Bring the RS hosting ROOT down and the RS hosting META down at once
|
||||||
|
|
|
@ -46,13 +46,14 @@ public class TestReplicationPeer {
|
||||||
rp = new ReplicationPeer(conf, "clusterKey", "clusterId");
|
rp = new ReplicationPeer(conf, "clusterKey", "clusterId");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
@Test(timeout=300000)
|
||||||
public void testResetZooKeeperSession() throws Exception {
|
public void testResetZooKeeperSession() throws Exception {
|
||||||
ZooKeeperWatcher zkw = rp.getZkw();
|
ZooKeeperWatcher zkw = rp.getZkw();
|
||||||
zkw.getRecoverableZooKeeper().exists("/1/2", false);
|
zkw.getRecoverableZooKeeper().exists("/1/2", false);
|
||||||
|
|
||||||
LOG.info("Expiring ReplicationPeer ZooKeeper session.");
|
LOG.info("Expiring ReplicationPeer ZooKeeper session.");
|
||||||
utility.expireSession(zkw, null, false);
|
utility.expireSession(zkw);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LOG.info("Attempting to use expired ReplicationPeer ZooKeeper session.");
|
LOG.info("Attempting to use expired ReplicationPeer ZooKeeper session.");
|
||||||
|
|
|
@ -77,8 +77,6 @@ public class TestRemoteTable {
|
||||||
TEST_UTIL.startMiniCluster();
|
TEST_UTIL.startMiniCluster();
|
||||||
REST_TEST_UTIL.startServletContainer(TEST_UTIL.getConfiguration());
|
REST_TEST_UTIL.startServletContainer(TEST_UTIL.getConfiguration());
|
||||||
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
|
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
|
||||||
LOG.info("Admin Connection=" + admin.getConnection() + ", " +
|
|
||||||
admin.getConnection().getZooKeeperWatcher());
|
|
||||||
if (!admin.tableExists(TABLE)) {
|
if (!admin.tableExists(TABLE)) {
|
||||||
HTableDescriptor htd = new HTableDescriptor(TABLE);
|
HTableDescriptor htd = new HTableDescriptor(TABLE);
|
||||||
htd.addFamily(new HColumnDescriptor(COLUMN_1));
|
htd.addFamily(new HColumnDescriptor(COLUMN_1));
|
||||||
|
@ -86,8 +84,6 @@ public class TestRemoteTable {
|
||||||
htd.addFamily(new HColumnDescriptor(COLUMN_3));
|
htd.addFamily(new HColumnDescriptor(COLUMN_3));
|
||||||
admin.createTable(htd);
|
admin.createTable(htd);
|
||||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE);
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE);
|
||||||
LOG.info("Table connection=" + table.getConnection() + ", " +
|
|
||||||
admin.getConnection().getZooKeeperWatcher());
|
|
||||||
Put put = new Put(ROW_1);
|
Put put = new Put(ROW_1);
|
||||||
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1);
|
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1);
|
||||||
table.put(put);
|
table.put(put);
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class TestMergeTable {
|
||||||
MetaReader.getTableRegions(ct, desc.getName());
|
MetaReader.getTableRegions(ct, desc.getName());
|
||||||
LOG.info("originalTableRegions size=" + originalTableRegions.size() +
|
LOG.info("originalTableRegions size=" + originalTableRegions.size() +
|
||||||
"; " + originalTableRegions);
|
"; " + originalTableRegions);
|
||||||
HBaseAdmin admin = new HBaseAdmin(new Configuration(c));
|
HBaseAdmin admin = new HBaseAdmin(c);
|
||||||
admin.disableTable(desc.getName());
|
admin.disableTable(desc.getName());
|
||||||
HMerge.merge(c, FileSystem.get(c), desc.getName());
|
HMerge.merge(c, FileSystem.get(c), desc.getName());
|
||||||
List<HRegionInfo> postMergeTableRegions =
|
List<HRegionInfo> postMergeTableRegions =
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class TestMergeTool extends HBaseTestCase {
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
// Set the timeout down else this test will take a while to complete.
|
// Set the timeout down else this test will take a while to complete.
|
||||||
this.conf.setLong("hbase.zookeeper.recoverable.waittime", 1000);
|
this.conf.setLong("hbase.zookeeper.recoverable.waittime", 10);
|
||||||
// Make it so we try and connect to a zk that is not there (else we might
|
// Make it so we try and connect to a zk that is not there (else we might
|
||||||
// find a zk ensemble put up by another concurrent test and this will
|
// find a zk ensemble put up by another concurrent test and this will
|
||||||
// mess up this test. Choose unlikely port. Default test port is 21818.
|
// mess up this test. Choose unlikely port. Default test port is 21818.
|
||||||
|
@ -186,7 +186,7 @@ public class TestMergeTool extends HBaseTestCase {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Merge merger = new Merge(this.conf);
|
Merge merger = new Merge(this.conf);
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
System.out.println("fs2=" + this.conf.get("fs.defaultFS"));
|
LOG.info("fs2=" + this.conf.get("fs.defaultFS"));
|
||||||
int errCode = ToolRunner.run(this.conf, merger,
|
int errCode = ToolRunner.run(this.conf, merger,
|
||||||
new String[] {this.desc.getNameAsString(), regionName1, regionName2}
|
new String[] {this.desc.getNameAsString(), regionName1, regionName2}
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue