HDFS-13331. [SBN read] Add lastSeenStateId to RpcRequestHeader. Contributed by Plamen Jeliazkov.

This commit is contained in:
Erik Krogen 2018-04-04 15:42:39 -07:00 committed by Chen Liang
parent 1eeca2d9fb
commit c0ca2bb853
9 changed files with 173 additions and 17 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
@ -48,4 +49,17 @@ public interface AlignmentContext {
*/
void receiveResponseState(RpcResponseHeaderProto header);
/**
* This is the intended client method call to pull last seen state info
* into RPC request processing.
* @param header The RPC request header builder.
*/
void updateRequestState(RpcRequestHeaderProto.Builder header);
/**
* This is the intended server method call to implement to receive
* client state info during RPC response header processing.
* @param header The RPC request header.
*/
void receiveRequestState(RpcRequestHeaderProto header);
}

View File

@ -1114,7 +1114,7 @@ public class Client implements AutoCloseable {
// Items '1' and '2' are prepared here.
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
clientId, alignmentContext);
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);

View File

@ -2486,6 +2486,11 @@ public abstract class Server {
}
}
if (alignmentContext != null) {
// Check incoming RPC request's state.
alignmentContext.receiveRequestState(header);
}
CallerContext callerContext = null;
if (header.hasCallerContext()) {
callerContext =

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.util;
import java.io.DataInput;
import java.io.IOException;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@ -165,6 +166,13 @@ public abstract class ProtoUtil {
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.OperationProto operation, int callId,
int retryCount, byte[] uuid) {
return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid,
null);
}
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.OperationProto operation, int callId,
int retryCount, byte[] uuid, AlignmentContext alignmentContext) {
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
.setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
@ -190,6 +198,11 @@ public abstract class ProtoUtil {
result.setCallerContext(contextBuilder);
}
// Add alignment context if it is not null
if (alignmentContext != null) {
alignmentContext.updateRequestState(result);
}
return result.build();
}
}

View File

@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
optional sint32 retryCount = 5 [default = -1];
optional RPCTraceInfoProto traceInfo = 6; // tracing info
optional RPCCallerContextProto callerContext = 7; // call context
optional int64 stateId = 8; // The last seen Global State ID
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import java.util.concurrent.atomic.LongAccumulator;
@ -33,16 +34,11 @@ import java.util.concurrent.atomic.LongAccumulator;
@InterfaceStability.Stable
class ClientGCIContext implements AlignmentContext {
private final DFSClient dfsClient;
private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
/**
* Client side constructor.
* @param dfsClient client side state receiver
*/
ClientGCIContext(DFSClient dfsClient) {
this.dfsClient = dfsClient;
long getLastSeenStateId() {
return lastSeenStateId.get();
}
/**
@ -55,11 +51,27 @@ class ClientGCIContext implements AlignmentContext {
}
/**
* Client side implementation for receiving state alignment info.
* Client side implementation for receiving state alignment info in responses.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
lastSeenStateId.accumulate(header.getStateId());
dfsClient.lastSeenStateId = lastSeenStateId.get();
}
/**
* Client side implementation for providing state alignment info in requests.
*/
@Override
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
header.setStateId(lastSeenStateId.longValue());
}
/**
* Client side implementation only provides state alignment info in requests.
* Client does not receive RPC requests therefore this does nothing.
*/
@Override
public void receiveRequestState(RpcRequestHeaderProto header) {
// Do nothing.
}
}

View File

@ -220,7 +220,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final UserGroupInformation ugi;
volatile boolean clientRunning = true;
volatile long lastLeaseRenewal;
volatile long lastSeenStateId;
private volatile FsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate;
final String clientName;
@ -243,6 +242,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int smallBufferSize;
private final long serverDefaultsValidityPeriod;
private final ClientGCIContext alignmentContext;
public DfsClientConf getConf() {
return dfsClientConf;
@ -398,7 +398,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
Client.setAlignmentContext(new ClientGCIContext(this));
this.alignmentContext = new ClientGCIContext();
Client.setAlignmentContext(alignmentContext);
}
/**
@ -547,6 +548,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return clientRunning;
}
@VisibleForTesting
ClientGCIContext getAlignmentContext() {
return alignmentContext;
}
long getLastLeaseRenewal() {
return lastLeaseRenewal;
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext {
}
/**
* Server side implementation for providing state alignment info.
* Server side implementation for providing state alignment info in responses.
*/
@Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) {
@ -56,4 +57,27 @@ class GlobalStateIdContext implements AlignmentContext {
public void receiveResponseState(RpcResponseHeaderProto header) {
// Do nothing.
}
/**
* Server side implementation only receives state alignment info.
* It does not build RPC requests therefore this does nothing.
*/
@Override
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
// Do nothing.
}
/**
* Server side implementation for processing state alignment info in requests.
*/
@Override
public void receiveRequestState(RpcRequestHeaderProto header) {
long serverStateId = namesystem.getLastWrittenTransactionId();
long clientStateId = header.getStateId();
if (clientStateId > serverStateId) {
FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
", but server state is: " + serverStateId);
}
}
}

View File

@ -18,20 +18,30 @@
package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Class is used to test server sending state alignment information to clients
@ -91,7 +101,7 @@ public class TestStateAlignmentContext {
public void testStateTransferOnWrite() throws Exception {
long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
long clientState = dfs.dfs.lastSeenStateId;
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
@ -109,7 +119,8 @@ public class TestStateAlignmentContext {
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
assertThat(clientState, is(lastWrittenId));
}
/**
@ -122,10 +133,80 @@ public class TestStateAlignmentContext {
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
}
}
/**
* This test mocks an AlignmentContext and ensures that DFSClient
* writes its lastSeenStateId into RPC requests.
*/
@Test
public void testClientSendsState() throws Exception {
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
Client.setAlignmentContext(spiedAlignContext);
// Collect RpcRequestHeaders for verification later.
final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
new ArrayList<>();
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
collectedHeaders.add(header);
return a.callRealMethod();
}).when(spiedAlignContext).updateRequestState(Mockito.any());
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
// Ensure first header and last header have different state.
assertThat(collectedHeaders.size() > 1, is(true));
assertThat(collectedHeaders.get(0).getStateId(),
is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
// Ensure collected RpcRequestHeaders are in increasing order.
long lastHeader = collectedHeaders.get(0).getStateId();
for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
collectedHeaders.subList(1, collectedHeaders.size())) {
long currentHeader = header.getStateId();
assertThat(currentHeader >= lastHeader, is(true));
lastHeader = header.getStateId();
}
}
/**
* This test mocks an AlignmentContext to send stateIds greater than
* server's stateId in RPC requests.
*/
@Test
public void testClientSendsGreaterState() throws Exception {
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
Client.setAlignmentContext(spiedAlignContext);
// Make every client call have a stateId > server's stateId.
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
try {
return a.callRealMethod();
} finally {
header.setStateId(Long.MAX_VALUE);
}
}).when(spiedAlignContext).updateRequestState(Mockito.any());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
logCapturer.stopCapturing();
String output = logCapturer.getOutput();
assertThat(output, containsString("A client sent stateId: "));
}
}