HBASE-15793 Port over AsyncCall improvements

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jurriaan Mous 2016-05-07 12:46:58 +02:00 committed by stack
parent ac31ceb835
commit fa033b6a08
9 changed files with 346 additions and 93 deletions

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Promise for responses
* @param <V> Value type
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface Future<V> extends io.netty.util.concurrent.Future<V> {
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Specific interface for the Response future listener
* @param <V> Value type.
*/
@InterfaceAudience.Private
public interface ResponseFutureListener<V>
extends GenericFutureListener<Future<V>> {
}

View File

@ -19,8 +19,7 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.DefaultPromise;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
@ -31,51 +30,72 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException;
import java.io.IOException;
/**
* Represents an Async Hbase call and its response.
*
* Responses are passed on to its given doneHandler and failures to the rpcController
*
* @param <T> Type of message returned
* @param <M> Message returned in communication to be converted
*/
@InterfaceAudience.Private
public class AsyncCall extends DefaultPromise<Message> {
public class AsyncCall<M extends Message, T> extends Promise<T> {
private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
final int id;
private final AsyncRpcChannelImpl channel;
final Descriptors.MethodDescriptor method;
final Message param;
final PayloadCarryingRpcController controller;
final Message responseDefaultType;
private final MessageConverter<M,T> messageConverter;
final long startTime;
final long rpcTimeout;
private final IOExceptionConverter exceptionConverter;
// For only the request
private final CellScanner cellScanner;
private final int priority;
final MetricsConnection.CallStats callStats;
/**
* Constructor
*
* @param eventLoop for call
* @param channel which initiated call
* @param connectId connection id
* @param md the method descriptor
* @param param parameters to send to Server
* @param controller controller for response
* @param cellScanner cellScanner containing cells to send as request
* @param responseDefaultType the default response type
* @param messageConverter converts the messages to what is the expected output
* @param rpcTimeout timeout for this call in ms
* @param priority for this request
*/
public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
param, PayloadCarryingRpcController controller, Message responseDefaultType,
public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor
md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
MetricsConnection.CallStats callStats) {
super(eventLoop);
super(channel.getEventExecutor());
this.channel = channel;
this.id = connectId;
this.method = md;
this.param = param;
this.controller = controller;
this.responseDefaultType = responseDefaultType;
this.messageConverter = messageConverter;
this.exceptionConverter = exceptionConverter;
this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
this.rpcTimeout = rpcTimeout;
this.priority = priority;
this.cellScanner = cellScanner;
this.callStats = callStats;
}
@ -101,17 +121,19 @@ public class AsyncCall extends DefaultPromise<Message> {
* @param value to set
* @param cellBlockScanner to set
*/
public void setSuccess(Message value, CellScanner cellBlockScanner) {
if (cellBlockScanner != null) {
controller.setCellScanner(cellBlockScanner);
}
public void setSuccess(M value, CellScanner cellBlockScanner) {
if (LOG.isTraceEnabled()) {
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
}
this.setSuccess(value);
try {
this.setSuccess(
this.messageConverter.convert(value, cellBlockScanner)
);
} catch (IOException e) {
this.setFailed(e);
}
}
/**
@ -127,6 +149,10 @@ public class AsyncCall extends DefaultPromise<Message> {
exception = ((RemoteException) exception).unwrapRemoteException();
}
if (this.exceptionConverter != null) {
exception = this.exceptionConverter.convert(exception);
}
this.setFailure(exception);
}
@ -138,4 +164,27 @@ public class AsyncCall extends DefaultPromise<Message> {
public long getRpcTimeout() {
return rpcTimeout;
}
/**
* @return Priority for this call
*/
public int getPriority() {
return priority;
}
/**
* Get the cellScanner for this request.
* @return CellScanner
*/
public CellScanner cellScanner() {
return cellScanner;
}
@Override
public boolean cancel(boolean mayInterupt){
this.channel.removePendingCall(this.id);
return super.cancel(mayInterupt);
}
}

View File

@ -21,11 +21,12 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
/**
@ -37,13 +38,23 @@ public interface AsyncRpcChannel {
/**
* Calls method on channel
* @param method to call
* @param controller to run call with
* @param request to send
* @param cellScanner with cells to send
* @param responsePrototype to construct response with
* @param messageConverter for the messages to expected result
* @param exceptionConverter for converting exceptions
* @param rpcTimeout timeout for request
* @param priority for request
* @param callStats collects stats of the call
* @return Promise for the response Message
*/
Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype, MetricsConnection.CallStats callStats);
<R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request,final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats);
/**
* Get the EventLoop on which this channel operated

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
@ -32,7 +30,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.ConnectException;
@ -51,8 +48,10 @@ import javax.security.sasl.SaslException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -291,36 +290,25 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel {
/**
* Calls method on channel
* @param method to call
* @param controller to run call with
* @param request to send
* @param cellScanner with cells to send
* @param responsePrototype to construct response with
* @param rpcTimeout timeout for request
* @param priority for request
* @return Promise for the response Message
*/
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype, MetricsConnection.CallStats callStats) {
final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
method, request, controller, responsePrototype, callStats);
controller.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
// TODO: do not need to call AsyncCall.setFailed?
synchronized (pendingCalls) {
pendingCalls.remove(call.id);
}
}
});
// TODO: this should be handled by PayloadCarryingRpcController.
if (controller.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.cancel(true);
return call;
}
public <R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request,final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats) {
final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
rpcTimeout, priority, callStats);
synchronized (pendingCalls) {
if (closed) {
Promise<Message> promise = channel.eventLoop().newPromise();
promise.setFailure(new ConnectException());
return promise;
call.setFailure(new ConnectException());
return call;
}
pendingCalls.put(call.id, call);
// Add timeout for cleanup if none is present
@ -398,7 +386,7 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel {
.setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
}
ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
if (cellBlock != null) {
final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
.newBuilder();
@ -406,8 +394,8 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel {
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
// Only pass priority if there one. Let zero be same as no priority.
if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
requestHeaderBuilder.setPriority(call.controller.getPriority());
if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
requestHeaderBuilder.setPriority(call.getPriority());
}
RPCProtos.RequestHeader rh = requestHeaderBuilder.build();

View File

@ -17,6 +17,12 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@ -30,9 +36,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -52,7 +55,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.ResponseFutureListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVM;
@ -60,13 +65,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
/**
* Netty client for the requests and responses
*/
@ -242,7 +240,18 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
final Future<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority(),
callStats);
pcrc.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
// Will automatically fail the promise with CancellationException
promise.cancel(true);
}
});
long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
try {
Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
@ -259,6 +268,18 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
}
private MessageConverter<Message, Message> getMessageConverterWithRpcController(
final PayloadCarryingRpcController pcrc) {
return new
MessageConverter<Message, Message>() {
@Override
public Message convert(Message msg, CellScanner cellScanner) {
pcrc.setCellScanner(cellScanner);
return msg;
}
};
}
/**
* Call method async
*/
@ -269,42 +290,46 @@ public class AsyncRpcClient extends AbstractRpcClient {
try {
connection = createRpcChannel(md.getService().getName(), addr, ticket);
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
GenericFutureListener<Future<Message>> listener =
new GenericFutureListener<Future<Message>>() {
@Override
public void operationComplete(Future<Message> future) throws Exception {
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(md, param, cs);
ResponseFutureListener<Message> listener =
new ResponseFutureListener<Message>() {
@Override
public void operationComplete(Future<Message> future) throws Exception {
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(md, param, cs);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
if (!future.isSuccess()) {
Throwable cause = future.cause();
if (cause instanceof IOException) {
pcrc.setFailed((IOException) cause);
} else {
pcrc.setFailed(new IOException(cause));
}
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
if (!future.isSuccess()) {
Throwable cause = future.cause();
} else {
try {
done.run(future.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
pcrc.setFailed((IOException) cause);
} else {
pcrc.setFailed(new IOException(cause));
}
} else {
try {
done.run(future.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
pcrc.setFailed((IOException) cause);
} else {
pcrc.setFailed(new IOException(cause));
}
} catch (InterruptedException e) {
pcrc.setFailed(new IOException(e));
}
} catch (InterruptedException e) {
pcrc.setFailed(new IOException(e));
}
}
};
}
};
cs.setStartTime(EnvironmentEdgeManager.currentTime());
connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
connection.callMethod(md, param, pcrc.cellScanner(), returnType,
getMessageConverterWithRpcController(pcrc), null,
pcrc.getCallTimeout(), pcrc.getPriority(), cs)
.addListener(listener);
} catch (StoppedRpcClientException|FailedServerException e) {
pcrc.setFailed(e);
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Converts exceptions to other exceptions
*/
@InterfaceAudience.Private
public interface IOExceptionConverter {
/**
* Converts given IOException
* @param e exception to convert
* @return converted IOException
*/
IOException convert(IOException e);
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Message;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Interface to convert Messages to specific types
* @param <M> Message Type to convert
* @param <O> Output Type
*/
@InterfaceAudience.Private
public interface MessageConverter<M,O> {
/**
* Converts Message to Output
* @param msg to convert
* @param cellScanner to use for conversion
* @return Output
* @throws IOException if message could not be converted to response
*/
O convert(M msg, CellScanner cellScanner) throws IOException;
MessageConverter<Message,Message> NO_CONVERTER = new MessageConverter<Message, Message>() {
@Override
public Message convert(Message msg, CellScanner cellScanner) throws IOException {
return null;
}
};
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
/**
* Abstract response promise
* @param <T> Type of result contained in Promise
*/
@InterfaceAudience.Private
public class Promise<T> extends DefaultPromise<T> implements Future<T> {
/**
* Constructor
* @param eventLoop to handle events on
*/
public Promise(EventExecutor eventLoop) {
super(eventLoop);
}
}