HBASE-21061 Fix inconsistent synchronization in RpcServer
move variables that we don't need synchronized access to out of the critical block. Signed-off-by: Mike Drob <mdrob@apache.org>
This commit is contained in:
parent
d07cab18a6
commit
3e45e0202b
|
@ -1640,7 +1640,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public synchronized int readAndProcess() throws IOException, InterruptedException {
|
||||
public int readAndProcess() throws IOException, InterruptedException {
|
||||
// If we have not read the connection setup preamble, look to see if that is on the wire.
|
||||
if (!connectionPreambleRead) {
|
||||
int count = readPreamble();
|
||||
|
@ -1668,85 +1668,95 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
}
|
||||
|
||||
// We have read a length and we have read the preamble. It is either the connection header
|
||||
// or it is a request.
|
||||
if (data == null) {
|
||||
dataLengthBuffer.flip();
|
||||
int dataLength = dataLengthBuffer.getInt();
|
||||
if (dataLength == RpcClient.PING_CALL_ID) {
|
||||
if (!useWrap) { //covers the !useSasl too
|
||||
dataLengthBuffer.clear();
|
||||
return 0; //ping message
|
||||
}
|
||||
}
|
||||
if (dataLength < 0) { // A data length of zero is legal.
|
||||
throw new DoNotRetryIOException("Unexpected data length "
|
||||
+ dataLength + "!! from " + getHostAddress());
|
||||
}
|
||||
final boolean useWrap = this.useWrap;
|
||||
final BlockingService service = this.service;
|
||||
final boolean headerAndPreambleRead = connectionHeaderRead && connectionPreambleRead;
|
||||
final boolean canUseRequestTooBig = headerAndPreambleRead &&
|
||||
VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
|
||||
RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION);
|
||||
|
||||
if (dataLength > maxRequestSize) {
|
||||
String msg = "RPC data length of " + dataLength + " received from "
|
||||
+ getHostAddress() + " is greater than max allowed "
|
||||
+ maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
|
||||
+ "\" on server to override this limit (not recommended)";
|
||||
LOG.warn(msg);
|
||||
|
||||
if (connectionHeaderRead && connectionPreambleRead) {
|
||||
incRpcCount();
|
||||
// Construct InputStream for the non-blocking SocketChannel
|
||||
// We need the InputStream because we want to read only the request header
|
||||
// instead of the whole rpc.
|
||||
final ByteBuffer buf = ByteBuffer.allocate(1);
|
||||
InputStream is = new InputStream() {
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
channelRead(channel, buf);
|
||||
buf.flip();
|
||||
int x = buf.get();
|
||||
buf.flip();
|
||||
return x;
|
||||
}
|
||||
};
|
||||
CodedInputStream cis = CodedInputStream.newInstance(is);
|
||||
int headerSize = cis.readRawVarint32();
|
||||
Message.Builder builder = RequestHeader.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, cis, headerSize);
|
||||
RequestHeader header = (RequestHeader) builder.build();
|
||||
|
||||
// Notify the client about the offending request
|
||||
Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
|
||||
null, this, responder, 0, null, this.addr,0);
|
||||
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
|
||||
// Make sure the client recognizes the underlying exception
|
||||
// Otherwise, throw a DoNotRetryIOException.
|
||||
if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
|
||||
RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
|
||||
setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
|
||||
} else {
|
||||
setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
|
||||
// we're guarding against data being modified concurrently
|
||||
// while trying to keep other instance members out of the block
|
||||
synchronized(this) {
|
||||
// We have read a length and we have read the preamble. It is either the connection header
|
||||
// or it is a request.
|
||||
if (data == null) {
|
||||
dataLengthBuffer.flip();
|
||||
int dataLength = dataLengthBuffer.getInt();
|
||||
if (dataLength == RpcClient.PING_CALL_ID) {
|
||||
if (!useWrap) { //covers the !useSasl too
|
||||
dataLengthBuffer.clear();
|
||||
return 0; //ping message
|
||||
}
|
||||
// We are going to close the connection, make sure we process the response
|
||||
// before that. In rare case when this fails, we still close the connection.
|
||||
responseWriteLock.lock();
|
||||
responder.processResponse(reqTooBig);
|
||||
responseWriteLock.unlock();
|
||||
}
|
||||
// Close the connection
|
||||
return -1;
|
||||
if (dataLength < 0) { // A data length of zero is legal.
|
||||
throw new DoNotRetryIOException("Unexpected data length "
|
||||
+ dataLength + "!! from " + getHostAddress());
|
||||
}
|
||||
|
||||
if (dataLength > maxRequestSize) {
|
||||
String msg = "RPC data length of " + dataLength + " received from "
|
||||
+ getHostAddress() + " is greater than max allowed "
|
||||
+ maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
|
||||
+ "\" on server to override this limit (not recommended)";
|
||||
LOG.warn(msg);
|
||||
|
||||
if (headerAndPreambleRead) {
|
||||
incRpcCount();
|
||||
// Construct InputStream for the non-blocking SocketChannel
|
||||
// We need the InputStream because we want to read only the request header
|
||||
// instead of the whole rpc.
|
||||
final ByteBuffer buf = ByteBuffer.allocate(1);
|
||||
InputStream is = new InputStream() {
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
channelRead(channel, buf);
|
||||
buf.flip();
|
||||
int x = buf.get();
|
||||
buf.flip();
|
||||
return x;
|
||||
}
|
||||
};
|
||||
CodedInputStream cis = CodedInputStream.newInstance(is);
|
||||
int headerSize = cis.readRawVarint32();
|
||||
Message.Builder builder = RequestHeader.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, cis, headerSize);
|
||||
RequestHeader header = (RequestHeader) builder.build();
|
||||
|
||||
// Notify the client about the offending request
|
||||
Call reqTooBig = new Call(header.getCallId(), service, null, null, null,
|
||||
null, this, responder, 0, null, this.addr,0);
|
||||
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
|
||||
// Make sure the client recognizes the underlying exception
|
||||
// Otherwise, throw a DoNotRetryIOException.
|
||||
if (canUseRequestTooBig) {
|
||||
setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
|
||||
} else {
|
||||
setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
|
||||
}
|
||||
// We are going to close the connection, make sure we process the response
|
||||
// before that. In rare case when this fails, we still close the connection.
|
||||
responseWriteLock.lock();
|
||||
responder.processResponse(reqTooBig);
|
||||
responseWriteLock.unlock();
|
||||
}
|
||||
// Close the connection
|
||||
return -1;
|
||||
}
|
||||
|
||||
data = ByteBuffer.allocate(dataLength);
|
||||
|
||||
// Increment the rpc count. This counter will be decreased when we write
|
||||
// the response. If we want the connection to be detected as idle properly, we
|
||||
// need to keep the inc / dec correct.
|
||||
incRpcCount();
|
||||
}
|
||||
|
||||
data = ByteBuffer.allocate(dataLength);
|
||||
count = channelRead(channel, data);
|
||||
|
||||
// Increment the rpc count. This counter will be decreased when we write
|
||||
// the response. If we want the connection to be detected as idle properly, we
|
||||
// need to keep the inc / dec correct.
|
||||
incRpcCount();
|
||||
}
|
||||
|
||||
count = channelRead(channel, data);
|
||||
|
||||
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
|
||||
process();
|
||||
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
|
||||
process();
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
|
|
Loading…
Reference in New Issue