HBASE-14526 Remove delayed rpc
This commit is contained in:
parent
5728416419
commit
70fb2e5aab
|
@ -863,7 +863,6 @@
|
|||
<include>DummyRegionServerEndpoint.proto</include>
|
||||
<include>TestProcedure.proto</include>
|
||||
<include>test.proto</include>
|
||||
<include>test_delayed_rpc.proto</include>
|
||||
<include>test_rpc_service.proto</include>
|
||||
</includes>
|
||||
</source>
|
||||
|
|
|
@ -125,13 +125,10 @@ public class CallRunner {
|
|||
sucessful = true;
|
||||
}
|
||||
}
|
||||
// Set the response for undelayed calls and delayed calls with
|
||||
// undelayed responses.
|
||||
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
|
||||
Message param = resultPair != null ? resultPair.getFirst() : null;
|
||||
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
|
||||
call.setResponse(param, cells, errorThrowable, error);
|
||||
}
|
||||
// Set the response
|
||||
Message param = resultPair != null ? resultPair.getFirst() : null;
|
||||
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
|
||||
call.setResponse(param, cells, errorThrowable, error);
|
||||
call.sendResponseIfReady();
|
||||
this.status.markComplete("Sent response");
|
||||
this.status.pause("Waiting for a call");
|
||||
|
|
|
@ -1,73 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A call whose response can be delayed by the server.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface Delayable {
|
||||
/**
|
||||
* Signal that the call response should be delayed, thus freeing the RPC
|
||||
* server to handle different requests.
|
||||
*
|
||||
* @param delayReturnValue Controls whether the return value of the call
|
||||
* should be set when ending the delay or right away. There are cases when
|
||||
* the return value can be set right away, even if the call is delayed.
|
||||
*/
|
||||
void startDelay(boolean delayReturnValue);
|
||||
|
||||
/**
|
||||
* @return is the call delayed?
|
||||
*/
|
||||
boolean isDelayed();
|
||||
|
||||
/**
|
||||
* @return is the return value delayed?
|
||||
*/
|
||||
boolean isReturnValueDelayed();
|
||||
|
||||
/**
|
||||
* Signal that the RPC server is now allowed to send the response.
|
||||
* @param result The value to return to the caller. If the corresponding
|
||||
* delay response specified that the return value should
|
||||
* not be delayed, this parameter must be null.
|
||||
* @throws IOException
|
||||
*/
|
||||
void endDelay(Object result) throws IOException;
|
||||
|
||||
/**
|
||||
* Signal the end of a delayed RPC, without specifying the return value. Use
|
||||
* this only if the return value was not delayed
|
||||
* @throws IOException
|
||||
*/
|
||||
void endDelay() throws IOException;
|
||||
|
||||
/**
|
||||
* End the call, throwing and exception to the caller. This works regardless
|
||||
* of the return value being delayed.
|
||||
* @param t Object to throw to the client.
|
||||
* @throws IOException
|
||||
*/
|
||||
void endDelayThrowing(Throwable t) throws IOException;
|
||||
}
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
|
|||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public interface RpcCallContext extends Delayable {
|
||||
public interface RpcCallContext {
|
||||
/**
|
||||
* Check if the caller who made this IPC call has disconnected.
|
||||
* If called from outside the context of IPC, this does nothing.
|
||||
|
|
|
@ -186,13 +186,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
*/
|
||||
private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
|
||||
|
||||
private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
|
||||
|
||||
private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
|
||||
|
||||
private final int warnDelayedCalls;
|
||||
|
||||
private AtomicInteger delayedCalls;
|
||||
private final IPCUtil ipcUtil;
|
||||
|
||||
private static final String AUTH_FAILED_FOR = "Auth failed for ";
|
||||
|
@ -305,10 +298,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
* Chain of buffers to send as response.
|
||||
*/
|
||||
protected BufferChain response;
|
||||
protected boolean delayResponse;
|
||||
protected Responder responder;
|
||||
protected boolean delayReturnValue; // if the return value should be
|
||||
// set at call completion
|
||||
|
||||
protected long size; // size of current call
|
||||
protected boolean isError;
|
||||
protected TraceInfo tinfo;
|
||||
|
@ -335,7 +326,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
this.connection = connection;
|
||||
this.timestamp = System.currentTimeMillis();
|
||||
this.response = null;
|
||||
this.delayResponse = false;
|
||||
this.responder = responder;
|
||||
this.isError = false;
|
||||
this.size = size;
|
||||
|
@ -476,51 +466,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
return new BufferChain(bbTokenLength, bbTokenBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void endDelay(Object result) throws IOException {
|
||||
assert this.delayResponse;
|
||||
assert this.delayReturnValue || result == null;
|
||||
this.delayResponse = false;
|
||||
delayedCalls.decrementAndGet();
|
||||
if (this.delayReturnValue) {
|
||||
this.setResponse(result, null, null, null);
|
||||
}
|
||||
this.responder.doRespond(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void endDelay() throws IOException {
|
||||
this.endDelay(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void startDelay(boolean delayReturnValue) {
|
||||
assert !this.delayResponse;
|
||||
this.delayResponse = true;
|
||||
this.delayReturnValue = delayReturnValue;
|
||||
int numDelayed = delayedCalls.incrementAndGet();
|
||||
if (numDelayed > warnDelayedCalls) {
|
||||
LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void endDelayThrowing(Throwable t) throws IOException {
|
||||
this.setResponse(null, null, t, StringUtils.stringifyException(t));
|
||||
this.delayResponse = false;
|
||||
this.sendResponseIfReady();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isDelayed() {
|
||||
return this.delayResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isReturnValueDelayed() {
|
||||
return this.delayReturnValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClientCellBlockSupported() {
|
||||
return this.connection != null && this.connection.codec != null;
|
||||
|
@ -557,15 +502,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
responseBlockSize += blockSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* If we have a response, and delay is not set, then respond
|
||||
* immediately. Otherwise, do not respond to client. This is
|
||||
* called by the RPC code in the context of the Handler thread.
|
||||
*/
|
||||
public synchronized void sendResponseIfReady() throws IOException {
|
||||
if (!this.delayResponse) {
|
||||
this.responder.doRespond(this);
|
||||
}
|
||||
this.responder.doRespond(this);
|
||||
}
|
||||
|
||||
public UserGroupInformation getRemoteUser() {
|
||||
|
@ -2081,8 +2019,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
|
||||
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
|
||||
|
||||
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
|
||||
this.delayedCalls = new AtomicInteger(0);
|
||||
this.ipcUtil = new IPCUtil(conf);
|
||||
|
||||
|
||||
|
|
|
@ -1,366 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Test that delayed RPCs work. Fire up three calls, the first of which should
|
||||
* be delayed. Check that the last two, which are undelayed, return before the
|
||||
* first one.
|
||||
*/
|
||||
@Category(MediumTests.class) // Fails sometimes with small tests
|
||||
public class TestDelayedRpc {
|
||||
private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
|
||||
public static RpcServerInterface rpcServer;
|
||||
public static final int UNDELAYED = 0;
|
||||
public static final int DELAYED = 1;
|
||||
private static final int RPC_CLIENT_TIMEOUT = 30000;
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testDelayedRpcImmediateReturnValue() throws Exception {
|
||||
testDelayedRpc(false);
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testDelayedRpcDelayedReturnValue() throws Exception {
|
||||
testDelayedRpc(true);
|
||||
}
|
||||
|
||||
private void testDelayedRpc(boolean delayReturnValue) throws Exception {
|
||||
LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
||||
TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
|
||||
BlockingService service =
|
||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
||||
rpcServer = new RpcServer(null, "testDelayedRpc",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
|
||||
isa,
|
||||
conf,
|
||||
new FifoRpcScheduler(conf, 1));
|
||||
rpcServer.start();
|
||||
RpcClient rpcClient = RpcClientFactory.createClient(
|
||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
||||
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
List<Integer> results = new ArrayList<Integer>();
|
||||
// Setting true sets 'delayed' on the client.
|
||||
TestThread th1 = new TestThread(stub, true, results);
|
||||
// Setting 'false' means we will return UNDELAYED as response immediately.
|
||||
TestThread th2 = new TestThread(stub, false, results);
|
||||
TestThread th3 = new TestThread(stub, false, results);
|
||||
th1.start();
|
||||
Thread.sleep(100);
|
||||
th2.start();
|
||||
Thread.sleep(200);
|
||||
th3.start();
|
||||
|
||||
th1.join();
|
||||
th2.join();
|
||||
th3.join();
|
||||
|
||||
// We should get the two undelayed responses first.
|
||||
assertEquals(UNDELAYED, results.get(0).intValue());
|
||||
assertEquals(UNDELAYED, results.get(1).intValue());
|
||||
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
|
||||
} finally {
|
||||
rpcClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ListAppender extends AppenderSkeleton {
|
||||
private final List<String> messages = new ArrayList<String>();
|
||||
|
||||
@Override
|
||||
protected void append(LoggingEvent event) {
|
||||
messages.add(event.getMessage().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requiresLayout() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public List<String> getMessages() {
|
||||
return messages;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that we see a WARN message in the logs.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testTooManyDelayedRpcs() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
final int MAX_DELAYED_RPC = 10;
|
||||
conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
|
||||
// Set up an appender to catch the "Too many delayed calls" that we expect.
|
||||
ListAppender listAppender = new ListAppender();
|
||||
Logger log = Logger.getLogger(RpcServer.class);
|
||||
log.addAppender(listAppender);
|
||||
log.setLevel(Level.WARN);
|
||||
|
||||
|
||||
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
||||
TestDelayedImplementation instance = new TestDelayedImplementation(true);
|
||||
BlockingService service =
|
||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
||||
rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
|
||||
isa,
|
||||
conf,
|
||||
new FifoRpcScheduler(conf, 1));
|
||||
rpcServer.start();
|
||||
RpcClient rpcClient = RpcClientFactory.createClient(
|
||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
||||
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
|
||||
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
|
||||
threads[i] = new TestThread(stub, true, null);
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
/* No warnings till here. */
|
||||
assertTrue(listAppender.getMessages().isEmpty());
|
||||
|
||||
/* This should give a warning. */
|
||||
threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
|
||||
threads[MAX_DELAYED_RPC].start();
|
||||
|
||||
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
assertFalse(listAppender.getMessages().isEmpty());
|
||||
assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
|
||||
|
||||
log.removeAppender(listAppender);
|
||||
} finally {
|
||||
rpcClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestDelayedImplementation
|
||||
implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
|
||||
/**
|
||||
* Should the return value of delayed call be set at the end of the delay
|
||||
* or at call return.
|
||||
*/
|
||||
private final boolean delayReturnValue;
|
||||
|
||||
/**
|
||||
* @param delayReturnValue Should the response to the delayed call be set
|
||||
* at the start or the end of the delay.
|
||||
*/
|
||||
public TestDelayedImplementation(boolean delayReturnValue) {
|
||||
this.delayReturnValue = delayReturnValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestResponse test(final RpcController rpcController, final TestArg testArg)
|
||||
throws ServiceException {
|
||||
boolean delay = testArg.getDelay();
|
||||
TestResponse.Builder responseBuilder = TestResponse.newBuilder();
|
||||
if (!delay) {
|
||||
responseBuilder.setResponse(UNDELAYED);
|
||||
return responseBuilder.build();
|
||||
}
|
||||
final Delayable call = RpcServer.getCurrentCall();
|
||||
call.startDelay(delayReturnValue);
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
TestResponse.Builder responseBuilder = TestResponse.newBuilder();
|
||||
call.endDelay(delayReturnValue ?
|
||||
responseBuilder.setResponse(DELAYED).build() : null);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
// This value should go back to client only if the response is set
|
||||
// immediately at delay time.
|
||||
responseBuilder.setResponse(0xDEADBEEF);
|
||||
return responseBuilder.build();
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestThread extends Thread {
|
||||
private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
|
||||
private final boolean delay;
|
||||
private final List<Integer> results;
|
||||
|
||||
public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
|
||||
boolean delay, List<Integer> results) {
|
||||
this.stub = stub;
|
||||
this.delay = delay;
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Integer result;
|
||||
try {
|
||||
result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
|
||||
getResponse());
|
||||
} catch (ServiceException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (results != null) {
|
||||
synchronized (results) {
|
||||
results.add(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndDelayThrowing() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
||||
FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
|
||||
BlockingService service =
|
||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
||||
rpcServer = new RpcServer(null, "testEndDelayThrowing",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
|
||||
isa,
|
||||
conf,
|
||||
new FifoRpcScheduler(conf, 1));
|
||||
rpcServer.start();
|
||||
RpcClient rpcClient = RpcClientFactory.createClient(
|
||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
||||
User.getCurrent(), 1000);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
|
||||
int result = 0xDEADBEEF;
|
||||
|
||||
try {
|
||||
result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
|
||||
} catch (Exception e) {
|
||||
fail("No exception should have been thrown.");
|
||||
}
|
||||
assertEquals(result, UNDELAYED);
|
||||
|
||||
boolean caughtException = false;
|
||||
try {
|
||||
result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
|
||||
} catch(Exception e) {
|
||||
// Exception thrown by server is enclosed in a RemoteException.
|
||||
if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
|
||||
caughtException = true;
|
||||
}
|
||||
LOG.warn("Caught exception, expected=" + caughtException);
|
||||
}
|
||||
assertTrue(caughtException);
|
||||
} finally {
|
||||
rpcClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delayed calls to this class throw an exception.
|
||||
*/
|
||||
private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
|
||||
public FaultyTestDelayedImplementation() {
|
||||
super(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestResponse test(RpcController rpcController, TestArg arg)
|
||||
throws ServiceException {
|
||||
LOG.info("In faulty test, delay=" + arg.getDelay());
|
||||
if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
|
||||
Delayable call = RpcServer.getCurrentCall();
|
||||
call.startDelay(true);
|
||||
LOG.info("In faulty test, delaying");
|
||||
try {
|
||||
call.endDelayThrowing(new Exception("Something went wrong"));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// Client will receive the Exception, not this value.
|
||||
return TestResponse.newBuilder().setResponse(DELAYED).build();
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -31,22 +31,28 @@ import java.net.InetSocketAddress;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
|
||||
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -69,6 +75,55 @@ public class TestSecureRPC {
|
|||
private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
|
||||
.getPath());
|
||||
|
||||
static final BlockingService SERVICE =
|
||||
TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
|
||||
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto ping(RpcController controller,
|
||||
TestProtos.EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto error(RpcController controller,
|
||||
TestProtos.EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EchoResponseProto echo(RpcController controller,
|
||||
TestProtos.EchoRequestProto request)
|
||||
throws ServiceException {
|
||||
if (controller instanceof PayloadCarryingRpcController) {
|
||||
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
|
||||
// If cells, scan them to check we are able to iterate what we were given and since
|
||||
// this is
|
||||
// an echo, just put them back on the controller creating a new block. Tests our
|
||||
// block
|
||||
// building.
|
||||
CellScanner cellScanner = pcrc.cellScanner();
|
||||
List<Cell> list = null;
|
||||
if (cellScanner != null) {
|
||||
list = new ArrayList<Cell>();
|
||||
try {
|
||||
while (cellScanner.advance()) {
|
||||
list.add(cellScanner.current());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
cellScanner = CellUtil.createCellScanner(list);
|
||||
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
|
||||
}
|
||||
return TestProtos.EchoResponseProto.newBuilder()
|
||||
.setMessage(request.getMessage()).build();
|
||||
}
|
||||
});
|
||||
|
||||
private static MiniKdc KDC;
|
||||
|
||||
private static String HOST = "localhost";
|
||||
|
@ -153,40 +208,34 @@ public class TestSecureRPC {
|
|||
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
||||
Mockito.when(securityInfoMock.getServerPrincipal())
|
||||
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
||||
SecurityInfo.addInfo("TestDelayedService", securityInfoMock);
|
||||
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
|
||||
|
||||
boolean delayReturnValue = false;
|
||||
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
||||
TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
|
||||
BlockingService service =
|
||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
||||
|
||||
RpcServerInterface rpcServer =
|
||||
new RpcServer(null, "testSecuredDelayedRpc",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
|
||||
new RpcServer(null, "AbstractTestSecureIPC",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
|
||||
conf, new FifoRpcScheduler(conf, 1));
|
||||
rpcServer.start();
|
||||
RpcClient rpcClient =
|
||||
RpcClientFactory.createClient(clientConfCopy, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
|
||||
HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
BlockingRpcChannel channel =
|
||||
rpcClient.createBlockingRpcChannel(
|
||||
|
||||
ServerName.valueOf(address.getHostName(), address.getPort(),
|
||||
System.currentTimeMillis()), clientUser, 5000);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
List<Integer> results = new ArrayList<Integer>();
|
||||
TestThread th1 = new TestThread(stub, true, results);
|
||||
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
|
||||
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
|
||||
List<String> results = new ArrayList<String>();
|
||||
TestThread th1 = new TestThread(stub, results);
|
||||
th1.start();
|
||||
th1.join();
|
||||
|
||||
assertEquals(0xDEADBEEF, results.get(0).intValue());
|
||||
} finally {
|
||||
rpcClient.close();
|
||||
rpcServer.stop();
|
||||
}
|
||||
}
|
||||
|
@ -212,4 +261,31 @@ public class TestSecureRPC {
|
|||
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
|
||||
callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
|
||||
}
|
||||
|
||||
public static class TestThread extends Thread {
|
||||
private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
|
||||
|
||||
private final List<String> results;
|
||||
|
||||
public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
|
||||
this.stub = stub;
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
String result;
|
||||
try {
|
||||
result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf(
|
||||
ThreadLocalRandom.current().nextInt())).build()).getMessage();
|
||||
} catch (ServiceException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (results != null) {
|
||||
synchronized (results) {
|
||||
results.add(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
|
||||
option java_outer_classname = "TestDelayedRpcProtos";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
|
||||
|
||||
message TestArg {
|
||||
required bool delay = 1;
|
||||
}
|
||||
|
||||
message TestResponse {
|
||||
required int32 response = 1;
|
||||
}
|
||||
|
||||
service TestDelayedService {
|
||||
rpc test(TestArg) returns (TestResponse);
|
||||
}
|
Loading…
Reference in New Issue