diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 09de8715438..c28f3e62abd 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -168,9 +168,13 @@ public class IntegrationTestRpcClient { TestRpcServer rpcServer = new TestRpcServer(conf); rpcServer.start(); - rpcServers.put(rpcServer.getListenerAddress(), rpcServer); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + rpcServers.put(address, rpcServer); serverList.add(rpcServer); - LOG.info("Started server: " + rpcServer.getListenerAddress()); + LOG.info("Started server: " + address); return rpcServer; } finally { lock.writeLock().unlock(); @@ -187,7 +191,13 @@ public class IntegrationTestRpcClient { int size = rpcServers.size(); int rand = random.nextInt(size); rpcServer = serverList.remove(rand); - rpcServers.remove(rpcServer.getListenerAddress()); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + // Throw exception here. We can't remove this instance from the server map because + // we no longer have access to its map key + throw new IOException("Listener channel is closed"); + } + rpcServers.remove(address); if (rpcServer != null) { stopServer(rpcServer); @@ -305,8 +315,12 @@ public class IntegrationTestRpcClient { TestRpcServer server = cluster.getRandomServer(); try { User user = User.getCurrent(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } ret = (EchoResponseProto) - rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress()); + rpcClient.callBlockingMethod(md, null, param, ret, user, address); } catch (Exception e) { LOG.warn(e); continue; // expected in case connection is closing or closed diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 38b7c91692f..ede4b4ebcd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc; * See the License for the specific language governing permissions and * limitations under the License. */ +import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import org.apache.commons.logging.Log; @@ -96,8 +97,9 @@ public class CallRunner { TraceScope traceScope = null; try { if (!this.rpcServer.isStarted()) { - throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress() - + " is not running yet"); + InetSocketAddress address = rpcServer.getListenerAddress(); + throw new ServerNotRunningYetException("Server " + + (address != null ? address : "(channel closed)") + " is not running yet"); } if (call.tinfo != null) { traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); @@ -143,9 +145,10 @@ public class CallRunner { throw e; } } catch (ClosedChannelException cce) { + InetSocketAddress address = rpcServer.getListenerAddress(); RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + - "this means that the server " + rpcServer.getListenerAddress() + " was processing a " + - "request but the client went away. The error message was: " + + "this means that the server " + (address != null ? address : "(channel closed)") + + " was processing a request but the client went away. The error message was: " + cce.getMessage()); } catch (Exception e) { RpcServer.LOG.warn(Thread.currentThread().getName() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2b678171b18..72ebda25394 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1797,8 +1797,9 @@ public class RpcServer implements RpcServerInterface { responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); + InetSocketAddress address = getListenerAddress(); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + getListenerAddress() + + "Call queue is full on " + (address != null ? address : "(channel closed)") + ", is hbase.ipc.server.max.callqueue.size too small?"); responder.doRespond(callTooBig); return; @@ -1826,8 +1827,9 @@ public class RpcServer implements RpcServerInterface { buf, offset, buf.length); } } catch (Throwable t) { - String msg = getListenerAddress() + " is unable to read call parameter from client " + - getHostAddress(); + InetSocketAddress address = getListenerAddress(); + String msg = (address != null ? address : "(channel closed)") + + " is unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); metrics.exception(t); @@ -2252,11 +2254,16 @@ public class RpcServer implements RpcServerInterface { } /** - * Return the socket (ip+port) on which the RPC server is listening to. - * @return the socket (ip+port) on which the RPC server is listening to. + * Return the socket (ip+port) on which the RPC server is listening to. May return null if + * the listener channel is closed. + * @return the socket (ip+port) on which the RPC server is listening to, or null if this + * information cannot be determined */ @Override public synchronized InetSocketAddress getListenerAddress() { + if (listener == null) { + return null; + } return listener.getAddress(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 882269628a3..47b373aebe1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -890,8 +890,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } // Set our address, however we need the final port that was given to rpcServer - isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort()); + isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); rpcServer.setErrorHandler(this); rs.setName(name); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index d427419835b..dffd8e9b3cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -159,10 +159,13 @@ public abstract class AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); final String message = "hello"; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } Pair r = client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, new MetricsConnection.CallStats()); @@ -200,12 +203,14 @@ public abstract class AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } Pair r = client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, new MetricsConnection.CallStats()); @@ -231,9 +236,12 @@ public abstract class AbstractTestIPC { AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } client.call(null, md, param, null, User.getCurrent(), address, new MetricsConnection.CallStats()); fail("Expected an exception to have been thrown!"); @@ -258,10 +266,14 @@ public abstract class AbstractTestIPC { verify(scheduler).start(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (int i = 0; i < 10; i++) { client.call(new PayloadCarryingRpcController( CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), + md.getOutputType().toProto(), User.getCurrent(), address, new MetricsConnection.CallStats()); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index 8921e895903..d9b3e494e90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -156,6 +156,9 @@ public class TestAsyncIPC extends AbstractTestIPC { try { rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -192,6 +195,9 @@ public class TestAsyncIPC extends AbstractTestIPC { try { rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -257,6 +263,9 @@ public class TestAsyncIPC extends AbstractTestIPC { try { rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } long startTime = System.currentTimeMillis(); User user = User.getCurrent(); for (int i = 0; i < cycles; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index deee7170a3b..41ee4cdd573 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -91,9 +91,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -173,9 +176,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -297,9 +303,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), 1000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 227f91aac08..d3dbd33e5af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -125,9 +125,12 @@ public class TestIPC extends AbstractTestIPC { rm.add(p); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); long startTime = System.currentTimeMillis(); User user = User.getCurrent(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (int i = 0; i < cycles; i++) { List cells = new ArrayList(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index fc2734ff744..ffb3927193f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -100,7 +100,11 @@ public class TestProtoBufRpc { Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); - this.isa = server.getListenerAddress(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.server.start(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index dbf119dfa36..03e9e4ec9ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -179,9 +179,12 @@ public class TestRpcHandlerException { EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); - + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - rpcServer.getListenerAddress(), new MetricsConnection.CallStats()); + address, new MetricsConnection.CallStats()); } catch (Throwable e) { assert(abortable.isAborted() == true); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java index 4ad2c310de3..9a48db7f953 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,13 +115,16 @@ public class TestRSKilledWhenInitializing { @Override protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException { if (firstRS.getAndSet(false)) { + InetSocketAddress address = super.getRpcServer().getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (NameStringPair e : c.getMapEntriesList()) { String key = e.getName(); // The hostname the master sees us as. if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { String hostnameFromMasterPOV = e.getValue(); - assertEquals(super.getRpcServer().getListenerAddress().getHostName(), - hostnameFromMasterPOV); + assertEquals(address.getHostName(), hostnameFromMasterPOV); } } while (!masterActive) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java index b4dd62bcaff..a9404084161 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java @@ -142,11 +142,14 @@ public class TestSecureRPC { RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer - .getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(), - 5000); + ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), User.getCurrent(), 5000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); List results = new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index 423cd1d1884..50bc5893482 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -141,7 +141,11 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.BlockingInterface.class)); this.rpcServer = new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); - this.isa = this.rpcServer.getListenerAddress(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.sleeper = new Sleeper(1000, this); }