HBASE-20020 Make sure we throw DoNotRetryIOException when ConnectionImplementation is closed

This commit is contained in:
zhangduo 2018-02-20 16:50:03 +08:00 committed by Michael Stack
parent c1fe9f441c
commit 34d3e847cc
8 changed files with 59 additions and 93 deletions

View File

@ -18,15 +18,13 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Thrown if the master is not running
*/
@InterfaceAudience.Public
public class MasterNotRunningException extends IOException {
public class MasterNotRunningException extends HBaseIOException {
private static final long serialVersionUID = (1L << 23) - 1L;
/** default constructor */
public MasterNotRunningException() {

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
@ -29,12 +28,12 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/** Internal methods on Connection that should not be used by user code. */
@InterfaceAudience.Private
@ -224,7 +223,7 @@ public interface ClusterConnection extends Connection {
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
MasterService.BlockingInterface getMaster() throws IOException;
MasterKeepAliveConnection getMaster() throws IOException;
/**
* Get the admin service for master.
@ -258,8 +257,7 @@ public interface ClusterConnection extends Connection {
* @return Location of row.
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation getRegionLocation(TableName tableName, byte [] row,
boolean reload)
HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload)
throws IOException;
/**
@ -268,24 +266,6 @@ public interface ClusterConnection extends Connection {
*/
void clearCaches(final ServerName sn);
/**
* This function allows HBaseAdmin and potentially others to get a shared MasterService
* connection.
* @return The shared instance. Never returns null.
* @throws MasterNotRunningException if master is not running
* @deprecated Since 0.96.0
*/
@Deprecated
MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException;
/**
* @param serverName of server to check
* @return true if the server is known as dead, false otherwise.
* @deprecated internal method, do not use thru ClusterConnection */
@Deprecated
boolean isDeadServer(ServerName serverName);
/**
* @return Nonce generator for this ClusterConnection; may be null if disabled in configuration.
*/

View File

@ -538,6 +538,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return this.conf;
}
private void checkClosed() throws DoNotRetryIOException {
if (this.closed) {
throw new DoNotRetryIOException(toString() + " closed");
}
}
/**
* @return true if the master is running, throws an exception otherwise
* @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
@ -545,20 +551,23 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
*/
@Deprecated
@Override
public boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException {
public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
// When getting the master connection, we check it's running,
// so if there is no exception, it means we've been able to get a
// connection on a running master
MasterKeepAliveConnection m = getKeepAliveMasterService();
MasterKeepAliveConnection m;
try {
m = getKeepAliveMasterService();
} catch (IOException e) {
throw new MasterNotRunningException(e);
}
m.close();
return true;
}
@Override
public HRegionLocation getRegionLocation(final TableName tableName,
final byte [] row, boolean reload)
throws IOException {
public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
boolean reload) throws IOException {
return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
}
@ -576,9 +585,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
throws IOException {
if (this.closed) {
throw new IOException(toString() + " closed");
}
checkClosed();
try {
if (!isTableEnabled(tableName)) {
LOG.debug("Table " + tableName + " not enabled");
@ -641,8 +648,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locations == null ? null : locations.getRegionLocation();
}
@Override
public boolean isDeadServer(ServerName sn) {
private boolean isDeadServer(ServerName sn) {
if (clusterStatusListener == null) {
return false;
} else {
@ -679,19 +685,19 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
public HRegionLocation locateRegion(
final TableName tableName, final byte[] row) throws IOException{
public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
throws IOException {
RegionLocations locations = locateRegion(tableName, row, true, true);
return locations == null ? null : locations.getRegionLocation();
}
@Override
public HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException{
RegionLocations locations = relocateRegion(tableName, row,
RegionReplicaUtil.DEFAULT_REPLICA_ID);
return locations == null ? null :
locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
throws IOException {
RegionLocations locations =
relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
return locations == null ? null
: locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@Override
@ -708,22 +714,17 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry)
throws IOException {
public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry) throws IOException {
return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry, int replicaId)
throws IOException {
if (this.closed) {
throw new DoNotRetryIOException(toString() + " closed");
}
public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
checkClosed();
if (tableName == null || tableName.getName().length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
throw new IllegalArgumentException("table name cannot be null or zero length");
}
if (tableName.equals(TableName.META_TABLE_NAME)) {
return locateMeta(tableName, useCache, replicaId);
@ -1170,6 +1171,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
throws IOException {
checkClosed();
if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead.");
}
@ -1184,6 +1186,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public BlockingInterface getClient(ServerName serverName) throws IOException {
checkClosed();
if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead.");
}
@ -1199,7 +1202,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
final MasterServiceState masterServiceState = new MasterServiceState(this);
@Override
public MasterProtos.MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
public MasterKeepAliveConnection getMaster() throws IOException {
return getKeepAliveMasterService();
}
@ -1207,20 +1210,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
mss.userCount++;
}
@Override
public MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException {
private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
synchronized (masterLock) {
if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
try {
this.masterServiceState.stub = stubMaker.makeStub();
} catch (MasterNotRunningException ex) {
throw ex;
} catch (IOException e) {
// rethrow as MasterNotRunningException so that we can keep the method sig
throw new MasterNotRunningException(e);
}
}
resetMasterServiceState(this.masterServiceState);
}
@ -1955,9 +1949,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public TableState getTableState(TableName tableName) throws IOException {
if (this.closed) {
throw new IOException(toString() + " closed");
}
checkClosed();
TableState tableState = MetaTableAccessor.getTableState(this, tableName);
if (tableState == null) {
throw new TableNotFoundException(tableName);

View File

@ -31,12 +31,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -51,9 +49,11 @@ import org.apache.hadoop.net.DNS;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@ -155,11 +155,11 @@ public final class ConnectionUtils {
}
@Override
public MasterKeepAliveConnection getKeepAliveMasterService() throws MasterNotRunningException {
public MasterKeepAliveConnection getMaster() throws IOException {
if (this.localHostClient instanceof MasterService.BlockingInterface) {
return new ShortCircuitMasterConnection((MasterService.BlockingInterface)this.localHostClient);
}
return super.getKeepAliveMasterService();
return super.getMaster();
}
}

View File

@ -162,10 +162,7 @@ public class HTable implements Table {
final RpcRetryingCallerFactory rpcCallerFactory,
final RpcControllerFactory rpcControllerFactory,
final ExecutorService pool) {
if (connection == null || connection.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}
this.connection = connection;
this.connection = Preconditions.checkNotNull(connection, "connection is null");
this.configuration = connection.getConfiguration();
this.connConfiguration = connection.getConnectionConfiguration();
if (pool == null) {

View File

@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* A RetryingCallable for Master RPC operations.
* Implement the #rpcCall method. It will be retried on error. See its javadoc and the javadoc of
@ -55,7 +54,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
@Override
public void prepare(boolean reload) throws IOException {
this.master = this.connection.getKeepAliveMasterService();
this.master = this.connection.getMaster();
}
@Override
@ -139,7 +138,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
}
private static boolean isMetaRegion(final byte[] regionName) {
return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
|| Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
return Bytes.equals(regionName, RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
Bytes.equals(regionName, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
}
}

View File

@ -92,7 +92,7 @@ public class TestSnapshotFromAdmin {
// mock the master admin to our mock
MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class);
Mockito.when(mockConnection.getConfiguration()).thenReturn(conf);
Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(mockMaster);
Mockito.when(mockConnection.getMaster()).thenReturn(mockMaster);
// we need a real retrying caller
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
@ -163,7 +163,7 @@ public class TestSnapshotFromAdmin {
// mock the master connection
MasterKeepAliveConnection master = Mockito.mock(MasterKeepAliveConnection.class);
Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(master);
Mockito.when(mockConnection.getMaster()).thenReturn(master);
SnapshotResponse response = SnapshotResponse.newBuilder().setExpectedTimeout(0).build();
Mockito.when(
master.snapshot((RpcController) Mockito.any(), Mockito.any()))

View File

@ -102,7 +102,7 @@ public class TestHBaseAdminNoCluster {
Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(),
(CreateTableRequest)Mockito.any())).
thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test")));
Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin);
Mockito.when(connection.getMaster()).thenReturn(masterAdmin);
Admin admin = new HBaseAdmin(connection);
try {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
@ -304,7 +304,7 @@ public class TestHBaseAdminNoCluster {
throw new MasterNotRunningException(); // all methods will throw an exception
}
});
Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin);
Mockito.when(connection.getMaster()).thenReturn(masterAdmin);
RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class);
Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory);
Mockito.when(rpcControllerFactory.newController()).thenReturn(