HBASE-14526 Remove delayed rpc
This commit is contained in:
parent
cb539917a2
commit
de62ad150d
|
@ -858,7 +858,6 @@
|
||||||
<include>DummyRegionServerEndpoint.proto</include>
|
<include>DummyRegionServerEndpoint.proto</include>
|
||||||
<include>TestProcedure.proto</include>
|
<include>TestProcedure.proto</include>
|
||||||
<include>test.proto</include>
|
<include>test.proto</include>
|
||||||
<include>test_delayed_rpc.proto</include>
|
|
||||||
<include>test_rpc_service.proto</include>
|
<include>test_rpc_service.proto</include>
|
||||||
</includes>
|
</includes>
|
||||||
</source>
|
</source>
|
||||||
|
|
|
@ -125,13 +125,10 @@ public class CallRunner {
|
||||||
sucessful = true;
|
sucessful = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Set the response for undelayed calls and delayed calls with
|
// Set the response
|
||||||
// undelayed responses.
|
Message param = resultPair != null ? resultPair.getFirst() : null;
|
||||||
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
|
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
|
||||||
Message param = resultPair != null ? resultPair.getFirst() : null;
|
call.setResponse(param, cells, errorThrowable, error);
|
||||||
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
|
|
||||||
call.setResponse(param, cells, errorThrowable, error);
|
|
||||||
}
|
|
||||||
call.sendResponseIfReady();
|
call.sendResponseIfReady();
|
||||||
this.status.markComplete("Sent response");
|
this.status.markComplete("Sent response");
|
||||||
this.status.pause("Waiting for a call");
|
this.status.pause("Waiting for a call");
|
||||||
|
|
|
@ -1,73 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A call whose response can be delayed by the server.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public interface Delayable {
|
|
||||||
/**
|
|
||||||
* Signal that the call response should be delayed, thus freeing the RPC
|
|
||||||
* server to handle different requests.
|
|
||||||
*
|
|
||||||
* @param delayReturnValue Controls whether the return value of the call
|
|
||||||
* should be set when ending the delay or right away. There are cases when
|
|
||||||
* the return value can be set right away, even if the call is delayed.
|
|
||||||
*/
|
|
||||||
void startDelay(boolean delayReturnValue);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return is the call delayed?
|
|
||||||
*/
|
|
||||||
boolean isDelayed();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return is the return value delayed?
|
|
||||||
*/
|
|
||||||
boolean isReturnValueDelayed();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Signal that the RPC server is now allowed to send the response.
|
|
||||||
* @param result The value to return to the caller. If the corresponding
|
|
||||||
* delay response specified that the return value should
|
|
||||||
* not be delayed, this parameter must be null.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void endDelay(Object result) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Signal the end of a delayed RPC, without specifying the return value. Use
|
|
||||||
* this only if the return value was not delayed
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void endDelay() throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* End the call, throwing and exception to the caller. This works regardless
|
|
||||||
* of the return value being delayed.
|
|
||||||
* @param t Object to throw to the client.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void endDelayThrowing(Throwable t) throws IOException;
|
|
||||||
}
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface RpcCallContext extends Delayable {
|
public interface RpcCallContext {
|
||||||
/**
|
/**
|
||||||
* Check if the caller who made this IPC call has disconnected.
|
* Check if the caller who made this IPC call has disconnected.
|
||||||
* If called from outside the context of IPC, this does nothing.
|
* If called from outside the context of IPC, this does nothing.
|
||||||
|
|
|
@ -185,13 +185,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
*/
|
*/
|
||||||
private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
|
private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
|
||||||
|
|
||||||
private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
|
|
||||||
|
|
||||||
private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
|
|
||||||
|
|
||||||
private final int warnDelayedCalls;
|
|
||||||
|
|
||||||
private AtomicInteger delayedCalls;
|
|
||||||
private final IPCUtil ipcUtil;
|
private final IPCUtil ipcUtil;
|
||||||
|
|
||||||
private static final String AUTH_FAILED_FOR = "Auth failed for ";
|
private static final String AUTH_FAILED_FOR = "Auth failed for ";
|
||||||
|
@ -305,10 +298,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
* Chain of buffers to send as response.
|
* Chain of buffers to send as response.
|
||||||
*/
|
*/
|
||||||
protected BufferChain response;
|
protected BufferChain response;
|
||||||
protected boolean delayResponse;
|
|
||||||
protected Responder responder;
|
protected Responder responder;
|
||||||
protected boolean delayReturnValue; // if the return value should be
|
|
||||||
// set at call completion
|
|
||||||
protected long size; // size of current call
|
protected long size; // size of current call
|
||||||
protected boolean isError;
|
protected boolean isError;
|
||||||
protected TraceInfo tinfo;
|
protected TraceInfo tinfo;
|
||||||
|
@ -336,7 +327,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.timestamp = System.currentTimeMillis();
|
this.timestamp = System.currentTimeMillis();
|
||||||
this.response = null;
|
this.response = null;
|
||||||
this.delayResponse = false;
|
|
||||||
this.responder = responder;
|
this.responder = responder;
|
||||||
this.isError = false;
|
this.isError = false;
|
||||||
this.size = size;
|
this.size = size;
|
||||||
|
@ -486,51 +476,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
return new BufferChain(bbTokenLength, bbTokenBytes);
|
return new BufferChain(bbTokenLength, bbTokenBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void endDelay(Object result) throws IOException {
|
|
||||||
assert this.delayResponse;
|
|
||||||
assert this.delayReturnValue || result == null;
|
|
||||||
this.delayResponse = false;
|
|
||||||
delayedCalls.decrementAndGet();
|
|
||||||
if (this.delayReturnValue) {
|
|
||||||
this.setResponse(result, null, null, null);
|
|
||||||
}
|
|
||||||
this.responder.doRespond(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void endDelay() throws IOException {
|
|
||||||
this.endDelay(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void startDelay(boolean delayReturnValue) {
|
|
||||||
assert !this.delayResponse;
|
|
||||||
this.delayResponse = true;
|
|
||||||
this.delayReturnValue = delayReturnValue;
|
|
||||||
int numDelayed = delayedCalls.incrementAndGet();
|
|
||||||
if (numDelayed > warnDelayedCalls) {
|
|
||||||
LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void endDelayThrowing(Throwable t) throws IOException {
|
|
||||||
this.setResponse(null, null, t, StringUtils.stringifyException(t));
|
|
||||||
this.delayResponse = false;
|
|
||||||
this.sendResponseIfReady();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized boolean isDelayed() {
|
|
||||||
return this.delayResponse;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized boolean isReturnValueDelayed() {
|
|
||||||
return this.delayReturnValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isClientCellBlockSupported() {
|
public boolean isClientCellBlockSupported() {
|
||||||
return this.connection != null && this.connection.codec != null;
|
return this.connection != null && this.connection.codec != null;
|
||||||
|
@ -567,15 +512,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
responseBlockSize += blockSize;
|
responseBlockSize += blockSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* If we have a response, and delay is not set, then respond
|
|
||||||
* immediately. Otherwise, do not respond to client. This is
|
|
||||||
* called by the RPC code in the context of the Handler thread.
|
|
||||||
*/
|
|
||||||
public synchronized void sendResponseIfReady() throws IOException {
|
public synchronized void sendResponseIfReady() throws IOException {
|
||||||
if (!this.delayResponse) {
|
this.responder.doRespond(this);
|
||||||
this.responder.doRespond(this);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public UserGroupInformation getRemoteUser() {
|
public UserGroupInformation getRemoteUser() {
|
||||||
|
@ -2082,8 +2020,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
|
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
|
||||||
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
|
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
|
||||||
|
|
||||||
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
|
|
||||||
this.delayedCalls = new AtomicInteger(0);
|
|
||||||
this.ipcUtil = new IPCUtil(conf);
|
this.ipcUtil = new IPCUtil(conf);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,367 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
|
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
|
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.log4j.AppenderSkeleton;
|
|
||||||
import org.apache.log4j.Level;
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.apache.log4j.spi.LoggingEvent;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.protobuf.BlockingRpcChannel;
|
|
||||||
import com.google.protobuf.BlockingService;
|
|
||||||
import com.google.protobuf.RpcController;
|
|
||||||
import com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that delayed RPCs work. Fire up three calls, the first of which should
|
|
||||||
* be delayed. Check that the last two, which are undelayed, return before the
|
|
||||||
* first one.
|
|
||||||
*/
|
|
||||||
@Category({RPCTests.class, MediumTests.class}) // Fails sometimes with small tests
|
|
||||||
public class TestDelayedRpc {
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
|
|
||||||
public static RpcServerInterface rpcServer;
|
|
||||||
public static final int UNDELAYED = 0;
|
|
||||||
public static final int DELAYED = 1;
|
|
||||||
private static final int RPC_CLIENT_TIMEOUT = 30000;
|
|
||||||
|
|
||||||
@Test (timeout=60000)
|
|
||||||
public void testDelayedRpcImmediateReturnValue() throws Exception {
|
|
||||||
testDelayedRpc(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test (timeout=60000)
|
|
||||||
public void testDelayedRpcDelayedReturnValue() throws Exception {
|
|
||||||
testDelayedRpc(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testDelayedRpc(boolean delayReturnValue) throws Exception {
|
|
||||||
LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
|
||||||
TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
|
|
||||||
BlockingService service =
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
|
||||||
rpcServer = new RpcServer(null, "testDelayedRpc",
|
|
||||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
|
|
||||||
isa,
|
|
||||||
conf,
|
|
||||||
new FifoRpcScheduler(conf, 1));
|
|
||||||
rpcServer.start();
|
|
||||||
RpcClient rpcClient = RpcClientFactory.createClient(
|
|
||||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
|
||||||
try {
|
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
|
||||||
if (address == null) {
|
|
||||||
throw new IOException("Listener channel is closed");
|
|
||||||
}
|
|
||||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
|
||||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
|
||||||
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
|
||||||
List<Integer> results = new ArrayList<Integer>();
|
|
||||||
// Setting true sets 'delayed' on the client.
|
|
||||||
TestThread th1 = new TestThread(stub, true, results);
|
|
||||||
// Setting 'false' means we will return UNDELAYED as response immediately.
|
|
||||||
TestThread th2 = new TestThread(stub, false, results);
|
|
||||||
TestThread th3 = new TestThread(stub, false, results);
|
|
||||||
th1.start();
|
|
||||||
Thread.sleep(100);
|
|
||||||
th2.start();
|
|
||||||
Thread.sleep(200);
|
|
||||||
th3.start();
|
|
||||||
|
|
||||||
th1.join();
|
|
||||||
th2.join();
|
|
||||||
th3.join();
|
|
||||||
|
|
||||||
// We should get the two undelayed responses first.
|
|
||||||
assertEquals(UNDELAYED, results.get(0).intValue());
|
|
||||||
assertEquals(UNDELAYED, results.get(1).intValue());
|
|
||||||
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
|
|
||||||
} finally {
|
|
||||||
rpcClient.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class ListAppender extends AppenderSkeleton {
|
|
||||||
private final List<String> messages = new ArrayList<String>();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void append(LoggingEvent event) {
|
|
||||||
messages.add(event.getMessage().toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean requiresLayout() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getMessages() {
|
|
||||||
return messages;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests that we see a WARN message in the logs.
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test (timeout=60000)
|
|
||||||
public void testTooManyDelayedRpcs() throws Exception {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
final int MAX_DELAYED_RPC = 10;
|
|
||||||
conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
|
|
||||||
// Set up an appender to catch the "Too many delayed calls" that we expect.
|
|
||||||
ListAppender listAppender = new ListAppender();
|
|
||||||
Logger log = Logger.getLogger(RpcServer.class);
|
|
||||||
log.addAppender(listAppender);
|
|
||||||
log.setLevel(Level.WARN);
|
|
||||||
|
|
||||||
|
|
||||||
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
|
||||||
TestDelayedImplementation instance = new TestDelayedImplementation(true);
|
|
||||||
BlockingService service =
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
|
||||||
rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
|
|
||||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
|
|
||||||
isa,
|
|
||||||
conf,
|
|
||||||
new FifoRpcScheduler(conf, 1));
|
|
||||||
rpcServer.start();
|
|
||||||
RpcClient rpcClient = RpcClientFactory.createClient(
|
|
||||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
|
||||||
try {
|
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
|
||||||
if (address == null) {
|
|
||||||
throw new IOException("Listener channel is closed");
|
|
||||||
}
|
|
||||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
|
||||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
|
||||||
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
|
||||||
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
|
|
||||||
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
|
|
||||||
threads[i] = new TestThread(stub, true, null);
|
|
||||||
threads[i].start();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* No warnings till here. */
|
|
||||||
assertTrue(listAppender.getMessages().isEmpty());
|
|
||||||
|
|
||||||
/* This should give a warning. */
|
|
||||||
threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
|
|
||||||
threads[MAX_DELAYED_RPC].start();
|
|
||||||
|
|
||||||
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
|
|
||||||
threads[i].join();
|
|
||||||
}
|
|
||||||
|
|
||||||
assertFalse(listAppender.getMessages().isEmpty());
|
|
||||||
assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
|
|
||||||
|
|
||||||
log.removeAppender(listAppender);
|
|
||||||
} finally {
|
|
||||||
rpcClient.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TestDelayedImplementation
|
|
||||||
implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
|
|
||||||
/**
|
|
||||||
* Should the return value of delayed call be set at the end of the delay
|
|
||||||
* or at call return.
|
|
||||||
*/
|
|
||||||
private final boolean delayReturnValue;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param delayReturnValue Should the response to the delayed call be set
|
|
||||||
* at the start or the end of the delay.
|
|
||||||
*/
|
|
||||||
public TestDelayedImplementation(boolean delayReturnValue) {
|
|
||||||
this.delayReturnValue = delayReturnValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TestResponse test(final RpcController rpcController, final TestArg testArg)
|
|
||||||
throws ServiceException {
|
|
||||||
boolean delay = testArg.getDelay();
|
|
||||||
TestResponse.Builder responseBuilder = TestResponse.newBuilder();
|
|
||||||
if (!delay) {
|
|
||||||
responseBuilder.setResponse(UNDELAYED);
|
|
||||||
return responseBuilder.build();
|
|
||||||
}
|
|
||||||
final Delayable call = RpcServer.getCurrentCall();
|
|
||||||
call.startDelay(delayReturnValue);
|
|
||||||
new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
Thread.sleep(500);
|
|
||||||
TestResponse.Builder responseBuilder = TestResponse.newBuilder();
|
|
||||||
call.endDelay(delayReturnValue ?
|
|
||||||
responseBuilder.setResponse(DELAYED).build() : null);
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}.start();
|
|
||||||
// This value should go back to client only if the response is set
|
|
||||||
// immediately at delay time.
|
|
||||||
responseBuilder.setResponse(0xDEADBEEF);
|
|
||||||
return responseBuilder.build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TestThread extends Thread {
|
|
||||||
private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
|
|
||||||
private final boolean delay;
|
|
||||||
private final List<Integer> results;
|
|
||||||
|
|
||||||
public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
|
|
||||||
boolean delay, List<Integer> results) {
|
|
||||||
this.stub = stub;
|
|
||||||
this.delay = delay;
|
|
||||||
this.results = results;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
Integer result;
|
|
||||||
try {
|
|
||||||
result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
|
|
||||||
getResponse());
|
|
||||||
} catch (ServiceException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
if (results != null) {
|
|
||||||
synchronized (results) {
|
|
||||||
results.add(result);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEndDelayThrowing() throws IOException {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
|
||||||
FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
|
|
||||||
BlockingService service =
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
|
||||||
rpcServer = new RpcServer(null, "testEndDelayThrowing",
|
|
||||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
|
|
||||||
isa,
|
|
||||||
conf,
|
|
||||||
new FifoRpcScheduler(conf, 1));
|
|
||||||
rpcServer.start();
|
|
||||||
RpcClient rpcClient = RpcClientFactory.createClient(
|
|
||||||
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
|
||||||
try {
|
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
|
||||||
if (address == null) {
|
|
||||||
throw new IOException("Listener channel is closed");
|
|
||||||
}
|
|
||||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
|
||||||
ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
|
|
||||||
User.getCurrent(), 1000);
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
|
||||||
|
|
||||||
int result = 0xDEADBEEF;
|
|
||||||
|
|
||||||
try {
|
|
||||||
result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
|
|
||||||
} catch (Exception e) {
|
|
||||||
fail("No exception should have been thrown.");
|
|
||||||
}
|
|
||||||
assertEquals(result, UNDELAYED);
|
|
||||||
|
|
||||||
boolean caughtException = false;
|
|
||||||
try {
|
|
||||||
result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
|
|
||||||
} catch(Exception e) {
|
|
||||||
// Exception thrown by server is enclosed in a RemoteException.
|
|
||||||
if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
|
|
||||||
caughtException = true;
|
|
||||||
}
|
|
||||||
LOG.warn("Caught exception, expected=" + caughtException);
|
|
||||||
}
|
|
||||||
assertTrue(caughtException);
|
|
||||||
} finally {
|
|
||||||
rpcClient.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delayed calls to this class throw an exception.
|
|
||||||
*/
|
|
||||||
private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
|
|
||||||
public FaultyTestDelayedImplementation() {
|
|
||||||
super(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TestResponse test(RpcController rpcController, TestArg arg)
|
|
||||||
throws ServiceException {
|
|
||||||
LOG.info("In faulty test, delay=" + arg.getDelay());
|
|
||||||
if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
|
|
||||||
Delayable call = RpcServer.getCurrentCall();
|
|
||||||
call.startDelay(true);
|
|
||||||
LOG.info("In faulty test, delaying");
|
|
||||||
try {
|
|
||||||
call.endDelayThrowing(new Exception("Something went wrong"));
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
// Client will receive the Exception, not this value.
|
|
||||||
return TestResponse.newBuilder().setResponse(DELAYED).build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
|
@ -32,20 +32,26 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||||
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||||
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
|
||||||
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
|
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
@ -70,6 +76,55 @@ public abstract class AbstractTestSecureIPC {
|
||||||
private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
|
private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
|
||||||
.getPath());
|
.getPath());
|
||||||
|
|
||||||
|
static final BlockingService SERVICE =
|
||||||
|
TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
|
||||||
|
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto ping(RpcController controller,
|
||||||
|
TestProtos.EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto error(RpcController controller,
|
||||||
|
TestProtos.EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EchoResponseProto echo(RpcController controller,
|
||||||
|
TestProtos.EchoRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
if (controller instanceof PayloadCarryingRpcController) {
|
||||||
|
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
|
||||||
|
// If cells, scan them to check we are able to iterate what we were given and since
|
||||||
|
// this is
|
||||||
|
// an echo, just put them back on the controller creating a new block. Tests our
|
||||||
|
// block
|
||||||
|
// building.
|
||||||
|
CellScanner cellScanner = pcrc.cellScanner();
|
||||||
|
List<Cell> list = null;
|
||||||
|
if (cellScanner != null) {
|
||||||
|
list = new ArrayList<Cell>();
|
||||||
|
try {
|
||||||
|
while (cellScanner.advance()) {
|
||||||
|
list.add(cellScanner.current());
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cellScanner = CellUtil.createCellScanner(list);
|
||||||
|
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
|
||||||
|
}
|
||||||
|
return TestProtos.EchoResponseProto.newBuilder()
|
||||||
|
.setMessage(request.getMessage()).build();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
private static MiniKdc KDC;
|
private static MiniKdc KDC;
|
||||||
private static String HOST = "localhost";
|
private static String HOST = "localhost";
|
||||||
private static String PRINCIPAL;
|
private static String PRINCIPAL;
|
||||||
|
@ -189,36 +244,31 @@ public abstract class AbstractTestSecureIPC {
|
||||||
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
||||||
Mockito.when(securityInfoMock.getServerPrincipal())
|
Mockito.when(securityInfoMock.getServerPrincipal())
|
||||||
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
||||||
SecurityInfo.addInfo("TestDelayedService", securityInfoMock);
|
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
|
||||||
|
|
||||||
boolean delayReturnValue = false;
|
|
||||||
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
||||||
TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
|
|
||||||
BlockingService service =
|
|
||||||
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
|
|
||||||
|
|
||||||
RpcServerInterface rpcServer =
|
RpcServerInterface rpcServer =
|
||||||
new RpcServer(null, "testSecuredDelayedRpc",
|
new RpcServer(null, "AbstractTestSecureIPC",
|
||||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
|
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
|
||||||
serverConf, new FifoRpcScheduler(serverConf, 1));
|
serverConf, new FifoRpcScheduler(serverConf, 1));
|
||||||
rpcServer.start();
|
rpcServer.start();
|
||||||
RpcClient rpcClient =
|
try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
|
||||||
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
||||||
try {
|
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||||
if (address == null) {
|
if (address == null) {
|
||||||
throw new IOException("Listener channel is closed");
|
throw new IOException("Listener channel is closed");
|
||||||
}
|
}
|
||||||
BlockingRpcChannel channel =
|
BlockingRpcChannel channel =
|
||||||
rpcClient.createBlockingRpcChannel(
|
rpcClient.createBlockingRpcChannel(
|
||||||
ServerName.valueOf(address.getHostName(), address.getPort(),
|
ServerName.valueOf(address.getHostName(), address.getPort(),
|
||||||
System.currentTimeMillis()), clientUser, 0);
|
System.currentTimeMillis()), clientUser, 0);
|
||||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
|
||||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
|
||||||
List<Integer> results = new ArrayList<>();
|
List<String> results = new ArrayList<>();
|
||||||
TestThread th1 = new TestThread(stub, true, results);
|
TestThread th1 = new TestThread(stub, results);
|
||||||
final Throwable exception[] = new Throwable[1];
|
final Throwable exception[] = new Throwable[1];
|
||||||
Collections.synchronizedList(new ArrayList<Throwable>());
|
Collections.synchronizedList(new ArrayList<Throwable>());
|
||||||
Thread.UncaughtExceptionHandler exceptionHandler =
|
Thread.UncaughtExceptionHandler exceptionHandler =
|
||||||
new Thread.UncaughtExceptionHandler() {
|
new Thread.UncaughtExceptionHandler() {
|
||||||
public void uncaughtException(Thread th, Throwable ex) {
|
public void uncaughtException(Thread th, Throwable ex) {
|
||||||
|
@ -235,11 +285,35 @@ public abstract class AbstractTestSecureIPC {
|
||||||
}
|
}
|
||||||
throw (Exception) exception[0];
|
throw (Exception) exception[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(0xDEADBEEF, results.get(0).intValue());
|
|
||||||
} finally {
|
} finally {
|
||||||
rpcClient.close();
|
|
||||||
rpcServer.stop();
|
rpcServer.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestThread extends Thread {
|
||||||
|
private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
|
||||||
|
|
||||||
|
private final List<String> results;
|
||||||
|
|
||||||
|
public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
|
||||||
|
this.stub = stub;
|
||||||
|
this.results = results;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
String result;
|
||||||
|
try {
|
||||||
|
result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf(
|
||||||
|
ThreadLocalRandom.current().nextInt())).build()).getMessage();
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
if (results != null) {
|
||||||
|
synchronized (results) {
|
||||||
|
results.add(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,34 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
|
|
||||||
option java_outer_classname = "TestDelayedRpcProtos";
|
|
||||||
option java_generic_services = true;
|
|
||||||
option java_generate_equals_and_hash = true;
|
|
||||||
|
|
||||||
|
|
||||||
message TestArg {
|
|
||||||
required bool delay = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message TestResponse {
|
|
||||||
required int32 response = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
service TestDelayedService {
|
|
||||||
rpc test(TestArg) returns (TestResponse);
|
|
||||||
}
|
|
Loading…
Reference in New Issue