HADOOP-9832. Add RPC header to client ping (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1510793 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-08-05 23:01:27 +00:00
parent ed2c62012a
commit 63a1273f2a
4 changed files with 51 additions and 24 deletions

View File

@ -18,10 +18,11 @@
package org.apache.hadoop.ipc;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
@ -382,6 +383,7 @@ public class Client {
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private boolean doPing; //do we need to send ping message
private int pingInterval; // how often sends ping to the server in msecs
private ByteArrayOutputStream pingRequest; // ping message
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@ -407,6 +409,15 @@ public class Client {
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
if (doPing) {
// construct a RPC header with the callId as the ping callId
pingRequest = new ByteArrayOutputStream();
RpcRequestHeaderProto pingHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId);
pingHeader.writeDelimitedTo(pingRequest);
}
this.pingInterval = remoteId.getPingInterval();
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
@ -910,7 +921,8 @@ public class Client {
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
out.writeInt(RpcConstants.PING_CALL_ID);
out.writeInt(pingRequest.size());
pingRequest.writeTo(out);
out.flush();
}
}

View File

@ -27,13 +27,13 @@ public class RpcConstants {
// Hidden Constructor
}
public static final int PING_CALL_ID = -1;
public static final int AUTHORIZATION_FAILED_CALL_ID = -1;
public static final int INVALID_CALL_ID = -2;
public static final int CONNECTION_CONTEXT_CALL_ID = -3;
public static final int PING_CALL_ID = -4;
public static final byte[] DUMMY_CLIENT_ID = new byte[0];
public static final int INVALID_CALL_ID = -2;
public static final int CONNECTION_CONTEXT_CALL_ID = -3;
public static final int INVALID_RETRY_COUNT = -1;

View File

@ -72,8 +72,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.*;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@ -1177,9 +1176,7 @@ public abstract class Server {
public UserGroupInformation attemptingUser = null; // user name before auth
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
@ -1523,11 +1520,6 @@ public abstract class Server {
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
// covers the !useSasl too
dataLengthBuffer.clear();
return 0; // ping message
}
checkDataLength(dataLength);
data = ByteBuffer.allocate(dataLength);
}
@ -1738,13 +1730,6 @@ public abstract class Server {
if (unwrappedData == null) {
unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
if (LOG.isDebugEnabled())
LOG.debug("Received ping message");
unwrappedDataLengthBuffer.clear();
continue; // ping message
}
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
}
@ -1913,6 +1898,8 @@ public abstract class Server {
"SASL protocol not requested by client");
}
saslReadAndProcess(dis);
} else if (callId == PING_CALL_ID) {
LOG.debug("Received ping message");
} else {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@ -1926,7 +1913,7 @@ public abstract class Server {
*/
private void authorizeConnection() throws WrappedRpcServerException {
try {
// If auth method is DIGEST, the token was obtained by the
// If auth method is TOKEN, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication

View File

@ -100,6 +100,7 @@ public class TestRPC {
void ping() throws IOException;
void slowPing(boolean shouldSlow) throws IOException;
void sleep(long delay) throws IOException, InterruptedException;
String echo(String value) throws IOException;
String[] echo(String[] value) throws IOException;
Writable echo(Writable value) throws IOException;
@ -145,6 +146,11 @@ public class TestRPC {
}
}
@Override
public void sleep(long delay) throws InterruptedException {
Thread.sleep(delay);
}
@Override
public String echo(String value) throws IOException { return value; }
@ -932,6 +938,28 @@ public class TestRPC {
}
}
@Test
public void testConnectionPing() throws Exception {
Configuration conf = new Configuration();
int pingInterval = 50;
conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
final Server server = new RPC.Builder(conf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.build();
server.start();
final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, server.getListenerAddress(), conf);
try {
// this call will throw exception if server couldn't decode the ping
proxy.sleep(pingInterval*4);
} finally {
if (proxy != null) RPC.stopProxy(proxy);
}
}
public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf);