HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
This commit is contained in:
parent
928dade1da
commit
efb82957da
|
@ -168,9 +168,13 @@ public class IntegrationTestRpcClient {
|
|||
|
||||
TestRpcServer rpcServer = new TestRpcServer(conf);
|
||||
rpcServer.start();
|
||||
rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
rpcServers.put(address, rpcServer);
|
||||
serverList.add(rpcServer);
|
||||
LOG.info("Started server: " + rpcServer.getListenerAddress());
|
||||
LOG.info("Started server: " + address);
|
||||
return rpcServer;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
|
@ -187,7 +191,13 @@ public class IntegrationTestRpcClient {
|
|||
int size = rpcServers.size();
|
||||
int rand = random.nextInt(size);
|
||||
rpcServer = serverList.remove(rand);
|
||||
rpcServers.remove(rpcServer.getListenerAddress());
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
// Throw exception here. We can't remove this instance from the server map because
|
||||
// we no longer have access to its map key
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
rpcServers.remove(address);
|
||||
|
||||
if (rpcServer != null) {
|
||||
stopServer(rpcServer);
|
||||
|
@ -305,8 +315,12 @@ public class IntegrationTestRpcClient {
|
|||
TestRpcServer server = cluster.getRandomServer();
|
||||
try {
|
||||
User user = User.getCurrent();
|
||||
InetSocketAddress address = server.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
ret = (EchoResponseProto)
|
||||
rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
|
||||
rpcClient.callBlockingMethod(md, null, param, ret, user, address);
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e);
|
||||
continue; // expected in case connection is closing or closed
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -96,8 +97,9 @@ public class CallRunner {
|
|||
TraceScope traceScope = null;
|
||||
try {
|
||||
if (!this.rpcServer.isStarted()) {
|
||||
throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
|
||||
+ " is not running yet");
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
throw new ServerNotRunningYetException("Server " +
|
||||
(address != null ? address : "(channel closed)") + " is not running yet");
|
||||
}
|
||||
if (call.tinfo != null) {
|
||||
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
|
||||
|
@ -143,9 +145,10 @@ public class CallRunner {
|
|||
throw e;
|
||||
}
|
||||
} catch (ClosedChannelException cce) {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
|
||||
"this means that the server " + rpcServer.getListenerAddress() + " was processing a " +
|
||||
"request but the client went away. The error message was: " +
|
||||
"this means that the server " + (address != null ? address : "(channel closed)") +
|
||||
" was processing a request but the client went away. The error message was: " +
|
||||
cce.getMessage());
|
||||
} catch (Exception e) {
|
||||
RpcServer.LOG.warn(Thread.currentThread().getName()
|
||||
|
|
|
@ -1813,8 +1813,9 @@ public class RpcServer implements RpcServerInterface {
|
|||
responder, totalRequestSize, null, null);
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
|
||||
InetSocketAddress address = getListenerAddress();
|
||||
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
|
||||
"Call queue is full on " + getListenerAddress() +
|
||||
"Call queue is full on " + (address != null ? address : "(channel closed)") +
|
||||
", is hbase.ipc.server.max.callqueue.size too small?");
|
||||
responder.doRespond(callTooBig);
|
||||
return;
|
||||
|
@ -1842,8 +1843,9 @@ public class RpcServer implements RpcServerInterface {
|
|||
buf, offset, buf.length);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
String msg = getListenerAddress() + " is unable to read call parameter from client " +
|
||||
getHostAddress();
|
||||
InetSocketAddress address = getListenerAddress();
|
||||
String msg = (address != null ? address : "(channel closed)") +
|
||||
" is unable to read call parameter from client " + getHostAddress();
|
||||
LOG.warn(msg, t);
|
||||
|
||||
metrics.exception(t);
|
||||
|
@ -2266,11 +2268,16 @@ public class RpcServer implements RpcServerInterface {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return the socket (ip+port) on which the RPC server is listening to.
|
||||
* @return the socket (ip+port) on which the RPC server is listening to.
|
||||
* Return the socket (ip+port) on which the RPC server is listening to. May return null if
|
||||
* the listener channel is closed.
|
||||
* @return the socket (ip+port) on which the RPC server is listening to, or null if this
|
||||
* information cannot be determined
|
||||
*/
|
||||
@Override
|
||||
public synchronized InetSocketAddress getListenerAddress() {
|
||||
if (listener == null) {
|
||||
return null;
|
||||
}
|
||||
return listener.getAddress();
|
||||
}
|
||||
|
||||
|
|
|
@ -972,8 +972,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
|
||||
DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
|
||||
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
// Set our address, however we need the final port that was given to rpcServer
|
||||
isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
|
||||
isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
|
||||
rpcServer.setErrorHandler(this);
|
||||
rs.setName(name);
|
||||
}
|
||||
|
|
|
@ -159,10 +159,13 @@ public abstract class AbstractTestIPC {
|
|||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
try {
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||
final String message = "hello";
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
Pair<Message, CellScanner> r =
|
||||
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
|
||||
new MetricsConnection.CallStats());
|
||||
|
@ -200,12 +203,14 @@ public abstract class AbstractTestIPC {
|
|||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
try {
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
|
||||
PayloadCarryingRpcController pcrc =
|
||||
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
Pair<Message, CellScanner> r =
|
||||
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
|
||||
new MetricsConnection.CallStats());
|
||||
|
@ -231,9 +236,12 @@ public abstract class AbstractTestIPC {
|
|||
AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
|
||||
try {
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
client.call(null, md, param, null, User.getCurrent(), address,
|
||||
new MetricsConnection.CallStats());
|
||||
fail("Expected an exception to have been thrown!");
|
||||
|
@ -258,10 +266,14 @@ public abstract class AbstractTestIPC {
|
|||
verify(scheduler).start();
|
||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
client.call(new PayloadCarryingRpcController(
|
||||
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
|
||||
md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
|
||||
md.getOutputType().toProto(), User.getCurrent(), address,
|
||||
new MetricsConnection.CallStats());
|
||||
}
|
||||
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
|
||||
|
|
|
@ -157,6 +157,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
|
|||
try {
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
|
||||
|
@ -193,6 +196,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
|
|||
try {
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
|
||||
|
@ -258,6 +264,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
|
|||
try {
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
long startTime = System.currentTimeMillis();
|
||||
User user = User.getCurrent();
|
||||
for (int i = 0; i < cycles; i++) {
|
||||
|
|
|
@ -92,9 +92,12 @@ public class TestDelayedRpc {
|
|||
RpcClient rpcClient = RpcClientFactory.createClient(
|
||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
|
||||
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
|
||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
||||
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
|
@ -174,9 +177,12 @@ public class TestDelayedRpc {
|
|||
RpcClient rpcClient = RpcClientFactory.createClient(
|
||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
|
||||
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
|
||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
||||
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
|
@ -298,9 +304,12 @@ public class TestDelayedRpc {
|
|||
RpcClient rpcClient = RpcClientFactory.createClient(
|
||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
|
||||
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
|
||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
||||
User.getCurrent(), 1000);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
|
|
|
@ -122,9 +122,12 @@ public class TestIPC extends AbstractTestIPC {
|
|||
rm.add(p);
|
||||
try {
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
long startTime = System.currentTimeMillis();
|
||||
User user = User.getCurrent();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
for (int i = 0; i < cycles; i++) {
|
||||
List<CellScannable> cells = new ArrayList<CellScannable>();
|
||||
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
|
||||
|
|
|
@ -101,7 +101,11 @@ public class TestProtoBufRpc {
|
|||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
|
||||
new InetSocketAddress(ADDRESS, PORT), conf,
|
||||
new FifoRpcScheduler(conf, 10));
|
||||
this.isa = server.getListenerAddress();
|
||||
InetSocketAddress address = server.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
this.isa = address;
|
||||
this.server.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -180,9 +180,12 @@ public class TestRpcHandlerException {
|
|||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
PayloadCarryingRpcController controller =
|
||||
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
|
||||
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
|
||||
rpcServer.getListenerAddress(), new MetricsConnection.CallStats());
|
||||
address, new MetricsConnection.CallStats());
|
||||
} catch (Throwable e) {
|
||||
assert(abortable.isAborted() == true);
|
||||
} finally {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -115,13 +116,16 @@ public class TestRSKilledWhenInitializing {
|
|||
@Override
|
||||
protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException {
|
||||
if (firstRS.getAndSet(false)) {
|
||||
InetSocketAddress address = super.getRpcServer().getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
for (NameStringPair e : c.getMapEntriesList()) {
|
||||
String key = e.getName();
|
||||
// The hostname the master sees us as.
|
||||
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
|
||||
String hostnameFromMasterPOV = e.getValue();
|
||||
assertEquals(super.getRpcServer().getListenerAddress().getHostName(),
|
||||
hostnameFromMasterPOV);
|
||||
assertEquals(address.getHostName(), hostnameFromMasterPOV);
|
||||
}
|
||||
}
|
||||
while (!masterActive) {
|
||||
|
|
|
@ -143,11 +143,14 @@ public class TestSecureRPC {
|
|||
RpcClient rpcClient =
|
||||
RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel =
|
||||
rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer
|
||||
.getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(),
|
||||
5000);
|
||||
ServerName.valueOf(address.getHostName(), address.getPort(),
|
||||
System.currentTimeMillis()), User.getCurrent(), 5000);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
List<Integer> results = new ArrayList<Integer>();
|
||||
|
|
|
@ -141,7 +141,11 @@ public class TestTokenAuthentication {
|
|||
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
|
||||
this.rpcServer =
|
||||
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
|
||||
this.isa = this.rpcServer.getListenerAddress();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
this.isa = address;
|
||||
this.sleeper = new Sleeper(1000, this);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue