HBASE-12558 TestHCM.testClusterStatus Unexpected exception, expected<org.apache.hadoop.hbase.regionserver.RegionServerStoppedException> but was<junit.framework.AssertionFailedError> -- ADDED DEBUG

This commit is contained in:
stack 2014-12-08 15:04:59 -08:00
parent 4b1983c89d
commit 2458be08ed
21 changed files with 130 additions and 1814 deletions

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -619,7 +620,7 @@ class ConnectionManager {
this.registry = setupRegistry(); this.registry = setupRegistry();
retrieveClusterId(); retrieveClusterId();
this.rpcClient = new RpcClient(this.conf, this.clusterId); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
// Do we publish the status? // Do we publish the status?
@ -639,7 +640,7 @@ class ConnectionManager {
@Override @Override
public void newDead(ServerName sn) { public void newDead(ServerName sn) {
clearCaches(sn); clearCaches(sn);
rpcClient.cancelConnections(sn.getHostname(), sn.getPort()); rpcClient.cancelConnections(sn);
} }
}, conf, listenerClass); }, conf, listenerClass);
} }
@ -783,18 +784,6 @@ class ConnectionManager {
return RegistryFactory.getRegistry(this); return RegistryFactory.getRegistry(this);
} }
/**
* For tests only.
* @param rpcClient Client we should use instead.
* @return Previous rpcClient
*/
@VisibleForTesting
RpcClient setRpcClient(final RpcClient rpcClient) {
RpcClient oldRpcClient = this.rpcClient;
this.rpcClient = rpcClient;
return oldRpcClient;
}
/** /**
* For tests only. * For tests only.
*/ */
@ -2336,7 +2325,7 @@ class ConnectionManager {
clusterStatusListener.close(); clusterStatusListener.close();
} }
if (rpcClient != null) { if (rpcClient != null) {
rpcClient.stop(); rpcClient.close();
} }
} }

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException; import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;

View File

@ -51,7 +51,7 @@ import com.google.protobuf.Message;
* Utility to help ipc'ing. * Utility to help ipc'ing.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class IPCUtil { public class IPCUtil {
public static final Log LOG = LogFactory.getLog(IPCUtil.class); public static final Log LOG = LogFactory.getLog(IPCUtil.class);
/** /**
* How much we think the decompressor will expand the original compressed content. * How much we think the decompressor will expand the original compressed content.
@ -60,7 +60,7 @@ class IPCUtil {
private final int cellBlockBuildingInitialBufferSize; private final int cellBlockBuildingInitialBufferSize;
private final Configuration conf; private final Configuration conf;
IPCUtil(final Configuration conf) { public IPCUtil(final Configuration conf) {
super(); super();
this.conf = conf; this.conf = conf;
this.cellBlockDecompressionMultiplier = this.cellBlockDecompressionMultiplier =
@ -81,14 +81,14 @@ class IPCUtil {
* <code>compressor</code>. * <code>compressor</code>.
* @param codec * @param codec
* @param compressor * @param compressor
* @Param cellScanner * @param cellScanner
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size. * flipped and is ready for reading. Use limit to find total size.
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("resource") @SuppressWarnings("resource")
ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner) final CellScanner cellScanner)
throws IOException { throws IOException {
if (cellScanner == null) return null; if (cellScanner == null) return null;
@ -145,7 +145,7 @@ class IPCUtil {
* @return CellScanner to work against the content of <code>cellBlock</code> * @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException * @throws IOException
*/ */
CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock) final byte [] cellBlock)
throws IOException { throws IOException {
return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
@ -159,7 +159,7 @@ class IPCUtil {
* @return CellScanner to work against the content of <code>cellBlock</code> * @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException * @throws IOException
*/ */
CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock, final int offset, final int length) final byte [] cellBlock, final int offset, final int length)
throws IOException { throws IOException {
// If compressed, decompress it first before passing it on else we will leak compression // If compressed, decompress it first before passing it on else we will leak compression
@ -200,7 +200,7 @@ class IPCUtil {
* @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null
* @throws IOException * @throws IOException
*/ */
static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
if (m == null) return null; if (m == null) return null;
int serializedSize = m.getSerializedSize(); int serializedSize = m.getSerializedSize();
int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
@ -223,7 +223,7 @@ class IPCUtil {
* @return Total number of bytes written. * @return Total number of bytes written.
* @throws IOException * @throws IOException
*/ */
static int write(final OutputStream dos, final Message header, final Message param, public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock) final ByteBuffer cellBlock)
throws IOException { throws IOException {
// Must calculate total size and write that first so other side can read it all in in one // Must calculate total size and write that first so other side can read it all in in one
@ -255,7 +255,7 @@ class IPCUtil {
* @param len * @param len
* @throws IOException * @throws IOException
*/ */
static void readChunked(final DataInput in, byte[] dest, int offset, int len) public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
throws IOException { throws IOException {
int maxRead = 8192; int maxRead = 8192;
@ -265,11 +265,9 @@ class IPCUtil {
} }
/** /**
* @param header
* @param body
* @return Size on the wire when the two messages are written with writeDelimitedTo * @return Size on the wire when the two messages are written with writeDelimitedTo
*/ */
static int getTotalSizeWhenWrittenDelimited(Message ... messages) { public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
int totalSize = 0; int totalSize = 0;
for (Message m: messages) { for (Message m: messages) {
if (m == null) continue; if (m == null) continue;

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -316,7 +316,7 @@ public class MetaTableLocator {
LOG.debug("Exception connecting to " + sn); LOG.debug("Exception connecting to " + sn);
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
LOG.debug("Unknown host exception connecting to " + sn); LOG.debug("Unknown host exception connecting to " + sn);
} catch (RpcClient.FailedServerException e) { } catch (FailedServerException e) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Server " + sn + " is in failed server list."); LOG.debug("Server " + sn + " is in failed server list.");
} }

View File

@ -148,7 +148,7 @@ import com.google.protobuf.TextFormat;
* CallRunner#run executes the call. When done, asks the included Call to put itself on new * CallRunner#run executes the call. When done, asks the included Call to put itself on new
* queue for Responder to pull from and return result to client. * queue for Responder to pull from and return result to client.
* *
* @see RpcClient * @see RpcClientImpl
*/ */
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving @InterfaceStability.Evolving

View File

@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;

View File

@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@ -91,6 +90,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController;
@ -782,10 +782,11 @@ public class HRegionServer extends HasThread implements
rsQuotaManager = new RegionServerQuotaManager(this); rsQuotaManager = new RegionServerQuotaManager(this);
// Setup RPC client for master communication // Setup RPC client for master communication
rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress( rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
rpcServices.isa.getAddress(), 0)); rpcServices.isa.getAddress(), 0));
int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD int storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
if (storefileRefreshPeriod > 0) { if (storefileRefreshPeriod > 0) {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this); this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
@ -994,7 +995,7 @@ public class HRegionServer extends HasThread implements
this.rssStub = null; this.rssStub = null;
} }
if (this.rpcClient != null) { if (this.rpcClient != null) {
this.rpcClient.stop(); this.rpcClient.close();
} }
if (this.leases != null) { if (this.leases != null) {
this.leases.close(); this.leases.close();

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
@ -65,7 +65,7 @@ public class TestClientScannerRPCTimeout {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
// Don't report so often so easier to see other rpcs // Don't report so often so easier to see other rpcs

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -35,6 +36,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -61,6 +64,10 @@ public class TestClientTimeouts {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(SLAVES); TEST_UTIL.startMiniCluster(SLAVES);
// Set the custom RPC client with random timeouts as the client
TEST_UTIL.getConfiguration().set(
RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
RandomTimeoutRpcClient.class.getName());
} }
/** /**
@ -81,7 +88,9 @@ public class TestClientTimeouts {
Connection lastConnection = null; Connection lastConnection = null;
boolean lastFailed = false; boolean lastFailed = false;
int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
RpcClient rpcClient = newRandomTimeoutRpcClient(); RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
.createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
try { try {
for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
lastFailed = false; lastFailed = false;
@ -94,13 +103,6 @@ public class TestClientTimeouts {
Connection connection = admin.getConnection(); Connection connection = admin.getConnection();
assertFalse(connection == lastConnection); assertFalse(connection == lastConnection);
lastConnection = connection; lastConnection = connection;
// Override the connection's rpc client for timeout testing
RpcClient oldRpcClient =
((ConnectionManager.HConnectionImplementation)connection).setRpcClient(
rpcClient);
if (oldRpcClient != null) {
oldRpcClient.stop();
}
// run some admin commands // run some admin commands
HBaseAdmin.checkHBaseAvailable(conf); HBaseAdmin.checkHBaseAvailable(conf);
admin.setBalancerRunning(false, false); admin.setBalancerRunning(false, false);
@ -111,7 +113,8 @@ public class TestClientTimeouts {
} finally { } finally {
admin.close(); admin.close();
if (admin.getConnection().isClosed()) { if (admin.getConnection().isClosed()) {
rpcClient = newRandomTimeoutRpcClient(); rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
.createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
} }
} }
} }
@ -119,31 +122,36 @@ public class TestClientTimeouts {
assertFalse(lastFailed); assertFalse(lastFailed);
assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations); assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
private static RpcClient newRandomTimeoutRpcClient() { /**
return new RpcClient( * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()) { */
// Return my own instance, one that does random timeouts public static class RandomTimeoutRpcClient extends RpcClientImpl{
@Override public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, super(conf, clusterId, localAddr);
User ticket, int rpcTimeout) { }
return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
} // Return my own instance, one that does random timeouts
}; @Override
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
User ticket, int rpcTimeout) {
return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
}
} }
/** /**
* Blocking rpc channel that goes via hbase rpc. * Blocking rpc channel that goes via hbase rpc.
*/ */
static class RandomTimeoutBlockingRpcChannel extends RpcClient.BlockingRpcChannelImplementation { static class RandomTimeoutBlockingRpcChannel
extends RpcClientImpl.BlockingRpcChannelImplementation {
private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Random RANDOM = new Random(System.currentTimeMillis());
public static final double CHANCE_OF_TIMEOUT = 0.3; public static final double CHANCE_OF_TIMEOUT = 0.3;
private static AtomicInteger invokations = new AtomicInteger(); private static AtomicInteger invokations = new AtomicInteger();
RandomTimeoutBlockingRpcChannel(final RpcClient rpcClient, final ServerName sn, RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn,
final User ticket, final int rpcTimeout) { final User ticket, final int rpcTimeout) {
super(rpcClient, sn, ticket, rpcTimeout); super(rpcClient, sn, ticket, rpcTimeout);
} }

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -97,7 +97,7 @@ public class TestFromClientSideNoCodec {
public void testNoCodec() { public void testNoCodec() {
Configuration c = new Configuration(); Configuration c = new Configuration();
c.set("hbase.client.default.rpc.codec", ""); c.set("hbase.client.default.rpc.codec", "");
String codec = RpcClient.getDefaultCodec(c); String codec = AbstractRpcClient.getDefaultCodec(c);
assertTrue(codec == null || codec.length() == 0); assertTrue(codec == null || codec.length() == 0);
} }
} }

View File

@ -394,7 +394,7 @@ public class TestHCM {
LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
for (int i = 0; i < 5000; i++) { for (int i = 0; i < 5000; i++) {
rpcClient.cancelConnections(sn.getHostname(), sn.getPort()); rpcClient.cancelConnections(sn);
Thread.sleep(5); Thread.sleep(5);
} }

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.FilterTests; import org.apache.hadoop.hbase.testclassification.FilterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -115,7 +115,7 @@ public class FilterTestingCluster {
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
initialize(TEST_UTIL.getConfiguration()); initialize(TEST_UTIL.getConfiguration());

View File

@ -89,7 +89,8 @@ public class TestDelayedRpc {
conf, conf,
new FifoRpcScheduler(conf, 1)); new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -118,7 +119,7 @@ public class TestDelayedRpc {
assertEquals(UNDELAYED, results.get(1).intValue()); assertEquals(UNDELAYED, results.get(1).intValue());
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF); assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
@ -170,7 +171,8 @@ public class TestDelayedRpc {
conf, conf,
new FifoRpcScheduler(conf, 1)); new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -200,7 +202,7 @@ public class TestDelayedRpc {
log.removeAppender(listAppender); log.removeAppender(listAppender);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
@ -293,7 +295,8 @@ public class TestDelayedRpc {
conf, conf,
new FifoRpcScheduler(conf, 1)); new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -323,7 +326,7 @@ public class TestDelayedRpc {
} }
assertTrue(caughtException); assertTrue(caughtException);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }

View File

@ -36,7 +36,7 @@ public class TestHBaseClient {
public void testFailedServer(){ public void testFailedServer(){
ManualEnvironmentEdge ee = new ManualEnvironmentEdge(); ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge( ee ); EnvironmentEdgeManager.injectEdge( ee );
RpcClient.FailedServers fs = new RpcClient.FailedServers(new Configuration()); FailedServers fs = new FailedServers(new Configuration());
InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12); InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12);
InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia

View File

@ -178,7 +178,7 @@ public class TestIPC {
@Test @Test
public void testNoCodec() throws InterruptedException, IOException { public void testNoCodec() throws InterruptedException, IOException {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT) { RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
@Override @Override
Codec getCodec() { Codec getCodec() {
return null; return null;
@ -197,7 +197,7 @@ public class TestIPC {
// Silly assertion that the message is in the returned pb. // Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message)); assertTrue(r.getFirst().toString().contains(message));
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }
@ -216,10 +216,10 @@ public class TestIPC {
throws IOException, InterruptedException, SecurityException, NoSuchMethodException { throws IOException, InterruptedException, SecurityException, NoSuchMethodException {
Configuration conf = new Configuration(HBaseConfiguration.create()); Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
doSimpleTest(conf, new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT)); doSimpleTest(conf, new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT));
} }
private void doSimpleTest(final Configuration conf, final RpcClient client) private void doSimpleTest(final Configuration conf, final RpcClientImpl client)
throws InterruptedException, IOException { throws InterruptedException, IOException {
TestRpcServer rpcServer = new TestRpcServer(); TestRpcServer rpcServer = new TestRpcServer();
List<Cell> cells = new ArrayList<Cell>(); List<Cell> cells = new ArrayList<Cell>();
@ -239,7 +239,7 @@ public class TestIPC {
} }
assertEquals(count, index); assertEquals(count, index);
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }
@ -258,7 +258,7 @@ public class TestIPC {
}).when(spyFactory).createSocket(); }).when(spyFactory).createSocket();
TestRpcServer rpcServer = new TestRpcServer(); TestRpcServer rpcServer = new TestRpcServer();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
try { try {
rpcServer.start(); rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress(); InetSocketAddress address = rpcServer.getListenerAddress();
@ -270,7 +270,7 @@ public class TestIPC {
LOG.info("Caught expected exception: " + e.toString()); LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }
@ -281,7 +281,7 @@ public class TestIPC {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler); RpcServer rpcServer = new TestRpcServer(scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject()); verify(scheduler).init((RpcScheduler.Context) anyObject());
RpcClient client = new RpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT); RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
try { try {
rpcServer.start(); rpcServer.start();
verify(scheduler).start(); verify(scheduler).start();
@ -312,7 +312,7 @@ public class TestIPC {
TestRpcServer rpcServer = new TestRpcServer(); TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL); KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL);
Put p = new Put(kv.getRow()); Put p = new Put(kv.getRow());
for (int i = 0; i < cellcount; i++) { for (int i = 0; i < cellcount; i++) {
@ -354,7 +354,7 @@ public class TestIPC {
LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " +
(System.currentTimeMillis() - startTime) + "ms"); (System.currentTimeMillis() - startTime) + "ms");
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }

View File

@ -112,7 +112,7 @@ public class TestProtoBufRpc {
@Test @Test
public void testProtoBufRpc() throws Exception { public void testProtoBufRpc() throws Exception {
RpcClient rpcClient = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()), ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()),
@ -136,7 +136,7 @@ public class TestProtoBufRpc {
} catch (ServiceException e) { } catch (ServiceException e) {
} }
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
@ -53,7 +54,7 @@ public class TestHMasterRPCException {
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
HMaster hm = new HMaster(conf, cp); HMaster hm = new HMaster(conf, cp);
ServerName sm = hm.getServerName(); ServerName sm = hm.getServerName();
RpcClient rpcClient = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
int i = 0; int i = 0;
//retry the RPC a few times; we have seen SocketTimeoutExceptions if we //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
@ -88,7 +89,7 @@ public class TestHMasterRPCException {
} }
fail(); fail();
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.testclassification.SecurityTests; import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
@ -99,7 +100,8 @@ public class TestSecureRPC {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa, conf, new FifoRpcScheduler(conf, 1)); isa, conf, new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory
.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -115,7 +117,7 @@ public class TestSecureRPC {
assertEquals(0xDEADBEEF, results.get(0).intValue()); assertEquals(0xDEADBEEF, results.get(0).intValue());
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
} }

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@ -400,7 +401,7 @@ public class TestTokenAuthentication {
testuser.doAs(new PrivilegedExceptionAction<Object>() { testuser.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception { public Object run() throws Exception {
Configuration c = server.getConfiguration(); Configuration c = server.getConfiguration();
RpcClient rpcClient = new RpcClient(c, clusterId.toString()); RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
ServerName sn = ServerName sn =
ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
System.currentTimeMillis()); System.currentTimeMillis());
@ -416,7 +417,7 @@ public class TestTokenAuthentication {
String authMethod = response.getAuthMethod(); String authMethod = response.getAuthMethod();
assertEquals("TOKEN", authMethod); assertEquals("TOKEN", authMethod);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
return null; return null;
} }

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@ -88,7 +88,7 @@ public class TestFlushSnapshotFromClient {
// Uncomment the following lines if more verbosity is needed for // Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details). // debugging (see HBASE-12285 for details).
//((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
setupConf(UTIL.getConfiguration()); setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(NUM_RS); UTIL.startMiniCluster(NUM_RS);