HBASE-20020 Make sure we throw DoNotRetryIOException when ConnectionImplementation is closed

This commit is contained in:
zhangduo 2018-02-20 16:50:03 +08:00 committed by Michael Stack
parent 391790ddb0
commit b7685307e4
8 changed files with 59 additions and 93 deletions

View File

@ -18,15 +18,13 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Thrown if the master is not running * Thrown if the master is not running
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
public class MasterNotRunningException extends IOException { public class MasterNotRunningException extends HBaseIOException {
private static final long serialVersionUID = (1L << 23) - 1L; private static final long serialVersionUID = (1L << 23) - 1L;
/** default constructor */ /** default constructor */
public MasterNotRunningException() { public MasterNotRunningException() {

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
@ -29,12 +28,12 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/** Internal methods on Connection that should not be used by user code. */ /** Internal methods on Connection that should not be used by user code. */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -224,7 +223,7 @@ public interface ClusterConnection extends Connection {
/** /**
* Returns a {@link MasterKeepAliveConnection} to the active master * Returns a {@link MasterKeepAliveConnection} to the active master
*/ */
MasterService.BlockingInterface getMaster() throws IOException; MasterKeepAliveConnection getMaster() throws IOException;
/** /**
* Get the admin service for master. * Get the admin service for master.
@ -258,9 +257,8 @@ public interface ClusterConnection extends Connection {
* @return Location of row. * @return Location of row.
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
*/ */
HRegionLocation getRegionLocation(TableName tableName, byte [] row, HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload)
boolean reload) throws IOException;
throws IOException;
/** /**
* Clear any caches that pertain to server name <code>sn</code>. * Clear any caches that pertain to server name <code>sn</code>.
@ -268,24 +266,6 @@ public interface ClusterConnection extends Connection {
*/ */
void clearCaches(final ServerName sn); void clearCaches(final ServerName sn);
/**
* This function allows HBaseAdmin and potentially others to get a shared MasterService
* connection.
* @return The shared instance. Never returns null.
* @throws MasterNotRunningException if master is not running
* @deprecated Since 0.96.0
*/
@Deprecated
MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException;
/**
* @param serverName of server to check
* @return true if the server is known as dead, false otherwise.
* @deprecated internal method, do not use thru ClusterConnection */
@Deprecated
boolean isDeadServer(ServerName serverName);
/** /**
* @return Nonce generator for this ClusterConnection; may be null if disabled in configuration. * @return Nonce generator for this ClusterConnection; may be null if disabled in configuration.
*/ */

View File

@ -538,6 +538,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return this.conf; return this.conf;
} }
private void checkClosed() throws DoNotRetryIOException {
if (this.closed) {
throw new DoNotRetryIOException(toString() + " closed");
}
}
/** /**
* @return true if the master is running, throws an exception otherwise * @return true if the master is running, throws an exception otherwise
* @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
@ -545,21 +551,24 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
*/ */
@Deprecated @Deprecated
@Override @Override
public boolean isMasterRunning() public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
throws MasterNotRunningException, ZooKeeperConnectionException {
// When getting the master connection, we check it's running, // When getting the master connection, we check it's running,
// so if there is no exception, it means we've been able to get a // so if there is no exception, it means we've been able to get a
// connection on a running master // connection on a running master
MasterKeepAliveConnection m = getKeepAliveMasterService(); MasterKeepAliveConnection m;
try {
m = getKeepAliveMasterService();
} catch (IOException e) {
throw new MasterNotRunningException(e);
}
m.close(); m.close();
return true; return true;
} }
@Override @Override
public HRegionLocation getRegionLocation(final TableName tableName, public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
final byte [] row, boolean reload) boolean reload) throws IOException {
throws IOException { return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
} }
@ -576,9 +585,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override @Override
public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
throws IOException { throws IOException {
if (this.closed) { checkClosed();
throw new IOException(toString() + " closed");
}
try { try {
if (!isTableEnabled(tableName)) { if (!isTableEnabled(tableName)) {
LOG.debug("Table " + tableName + " not enabled"); LOG.debug("Table " + tableName + " not enabled");
@ -641,8 +648,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locations == null ? null : locations.getRegionLocation(); return locations == null ? null : locations.getRegionLocation();
} }
@Override private boolean isDeadServer(ServerName sn) {
public boolean isDeadServer(ServerName sn) {
if (clusterStatusListener == null) { if (clusterStatusListener == null) {
return false; return false;
} else { } else {
@ -679,19 +685,19 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
@Override @Override
public HRegionLocation locateRegion( public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
final TableName tableName, final byte[] row) throws IOException{ throws IOException {
RegionLocations locations = locateRegion(tableName, row, true, true); RegionLocations locations = locateRegion(tableName, row, true, true);
return locations == null ? null : locations.getRegionLocation(); return locations == null ? null : locations.getRegionLocation();
} }
@Override @Override
public HRegionLocation relocateRegion(final TableName tableName, public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
final byte [] row) throws IOException{ throws IOException {
RegionLocations locations = relocateRegion(tableName, row, RegionLocations locations =
RegionReplicaUtil.DEFAULT_REPLICA_ID); relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
return locations == null ? null : return locations == null ? null
locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
} }
@Override @Override
@ -708,22 +714,17 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
@Override @Override
public RegionLocations locateRegion(final TableName tableName, public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
final byte [] row, boolean useCache, boolean retry) boolean retry) throws IOException {
throws IOException {
return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
} }
@Override @Override
public RegionLocations locateRegion(final TableName tableName, public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
final byte [] row, boolean useCache, boolean retry, int replicaId) boolean retry, int replicaId) throws IOException {
throws IOException { checkClosed();
if (this.closed) { if (tableName == null || tableName.getName().length == 0) {
throw new DoNotRetryIOException(toString() + " closed"); throw new IllegalArgumentException("table name cannot be null or zero length");
}
if (tableName== null || tableName.getName().length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
} }
if (tableName.equals(TableName.META_TABLE_NAME)) { if (tableName.equals(TableName.META_TABLE_NAME)) {
return locateMeta(tableName, useCache, replicaId); return locateMeta(tableName, useCache, replicaId);
@ -1170,6 +1171,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override @Override
public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
throws IOException { throws IOException {
checkClosed();
if (isDeadServer(serverName)) { if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead."); throw new RegionServerStoppedException(serverName + " is dead.");
} }
@ -1184,6 +1186,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override @Override
public BlockingInterface getClient(ServerName serverName) throws IOException { public BlockingInterface getClient(ServerName serverName) throws IOException {
checkClosed();
if (isDeadServer(serverName)) { if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead."); throw new RegionServerStoppedException(serverName + " is dead.");
} }
@ -1199,7 +1202,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
final MasterServiceState masterServiceState = new MasterServiceState(this); final MasterServiceState masterServiceState = new MasterServiceState(this);
@Override @Override
public MasterProtos.MasterService.BlockingInterface getMaster() throws MasterNotRunningException { public MasterKeepAliveConnection getMaster() throws IOException {
return getKeepAliveMasterService(); return getKeepAliveMasterService();
} }
@ -1207,20 +1210,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
mss.userCount++; mss.userCount++;
} }
@Override private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
public MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException {
synchronized (masterLock) { synchronized (masterLock) {
if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
try { this.masterServiceState.stub = stubMaker.makeStub();
this.masterServiceState.stub = stubMaker.makeStub();
} catch (MasterNotRunningException ex) {
throw ex;
} catch (IOException e) {
// rethrow as MasterNotRunningException so that we can keep the method sig
throw new MasterNotRunningException(e);
}
} }
resetMasterServiceState(this.masterServiceState); resetMasterServiceState(this.masterServiceState);
} }
@ -1955,9 +1949,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override @Override
public TableState getTableState(TableName tableName) throws IOException { public TableState getTableState(TableName tableName) throws IOException {
if (this.closed) { checkClosed();
throw new IOException(toString() + " closed");
}
TableState tableState = MetaTableAccessor.getTableState(this, tableName); TableState tableState = MetaTableAccessor.getTableState(this, tableName);
if (tableState == null) { if (tableState == null) {
throw new TableNotFoundException(tableName); throw new TableNotFoundException(tableName);

View File

@ -31,12 +31,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -51,9 +49,11 @@ import org.apache.hadoop.net.DNS;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@ -155,11 +155,11 @@ public final class ConnectionUtils {
} }
@Override @Override
public MasterKeepAliveConnection getKeepAliveMasterService() throws MasterNotRunningException { public MasterKeepAliveConnection getMaster() throws IOException {
if (this.localHostClient instanceof MasterService.BlockingInterface) { if (this.localHostClient instanceof MasterService.BlockingInterface) {
return new ShortCircuitMasterConnection((MasterService.BlockingInterface)this.localHostClient); return new ShortCircuitMasterConnection((MasterService.BlockingInterface)this.localHostClient);
} }
return super.getKeepAliveMasterService(); return super.getMaster();
} }
} }

View File

@ -162,10 +162,7 @@ public class HTable implements Table {
final RpcRetryingCallerFactory rpcCallerFactory, final RpcRetryingCallerFactory rpcCallerFactory,
final RpcControllerFactory rpcControllerFactory, final RpcControllerFactory rpcControllerFactory,
final ExecutorService pool) { final ExecutorService pool) {
if (connection == null || connection.isClosed()) { this.connection = Preconditions.checkNotNull(connection, "connection is null");
throw new IllegalArgumentException("Connection is null or closed.");
}
this.connection = connection;
this.configuration = connection.getConfiguration(); this.configuration = connection.getConfiguration();
this.connConfiguration = connection.getConnectionConfiguration(); this.connConfiguration = connection.getConnectionConfiguration();
if (pool == null) { if (pool == null) {

View File

@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/** /**
* A RetryingCallable for Master RPC operations. * A RetryingCallable for Master RPC operations.
* Implement the #rpcCall method. It will be retried on error. See its javadoc and the javadoc of * Implement the #rpcCall method. It will be retried on error. See its javadoc and the javadoc of
@ -55,7 +54,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
@Override @Override
public void prepare(boolean reload) throws IOException { public void prepare(boolean reload) throws IOException {
this.master = this.connection.getKeepAliveMasterService(); this.master = this.connection.getMaster();
} }
@Override @Override
@ -139,7 +138,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
} }
private static boolean isMetaRegion(final byte[] regionName) { private static boolean isMetaRegion(final byte[] regionName) {
return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) return Bytes.equals(regionName, RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
|| Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); Bytes.equals(regionName, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
} }
} }

View File

@ -92,7 +92,7 @@ public class TestSnapshotFromAdmin {
// mock the master admin to our mock // mock the master admin to our mock
MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class); MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class);
Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf);
Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(mockMaster); Mockito.when(mockConnection.getMaster()).thenReturn(mockMaster);
// we need a real retrying caller // we need a real retrying caller
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
@ -163,7 +163,7 @@ public class TestSnapshotFromAdmin {
// mock the master connection // mock the master connection
MasterKeepAliveConnection master = Mockito.mock(MasterKeepAliveConnection.class); MasterKeepAliveConnection master = Mockito.mock(MasterKeepAliveConnection.class);
Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(master); Mockito.when(mockConnection.getMaster()).thenReturn(master);
SnapshotResponse response = SnapshotResponse.newBuilder().setExpectedTimeout(0).build(); SnapshotResponse response = SnapshotResponse.newBuilder().setExpectedTimeout(0).build();
Mockito.when( Mockito.when(
master.snapshot((RpcController) Mockito.any(), Mockito.any())) master.snapshot((RpcController) Mockito.any(), Mockito.any()))

View File

@ -102,7 +102,7 @@ public class TestHBaseAdminNoCluster {
Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(), Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(),
(CreateTableRequest)Mockito.any())). (CreateTableRequest)Mockito.any())).
thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test"))); thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test")));
Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin); Mockito.when(connection.getMaster()).thenReturn(masterAdmin);
Admin admin = new HBaseAdmin(connection); Admin admin = new HBaseAdmin(connection);
try { try {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
@ -304,7 +304,7 @@ public class TestHBaseAdminNoCluster {
throw new MasterNotRunningException(); // all methods will throw an exception throw new MasterNotRunningException(); // all methods will throw an exception
} }
}); });
Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin); Mockito.when(connection.getMaster()).thenReturn(masterAdmin);
RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class); RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class);
Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory); Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory);
Mockito.when(rpcControllerFactory.newController()).thenReturn( Mockito.when(rpcControllerFactory.newController()).thenReturn(