HDFS-12977. [SBN read] Add stateId to RPC headers. Contributed by Plamen Jeliazkov.

This commit is contained in:
Plamen Jeliazkov 2018-03-20 18:48:40 -07:00 committed by Chen Liang
parent 4cf63905d0
commit 21a886a4ef
14 changed files with 389 additions and 12 deletions

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
* This interface intends to align the state between client and server
* via RPC communication.
*
* This should be implemented separately on the client side and server side
* and can be used to pass state information on RPC responses from server
* to client.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public interface AlignmentContext {
/**
* This is the intended server method call to implement to pass state info
* during RPC response header construction.
* @param header The RPC response header builder.
*/
void updateResponseState(RpcResponseHeaderProto.Builder header);
/**
* This is the intended client method call to implement to recieve state info
* during RPC response processing.
* @param header The RPC response header.
*/
void receiveResponseState(RpcResponseHeaderProto header);
}

View File

@ -103,6 +103,12 @@ public class Client implements AutoCloseable {
return false; return false;
} }
}; };
private static AlignmentContext alignmentContext;
/** Set alignment context to use to fetch state alignment info from RPC. */
public static void setAlignmentContext(AlignmentContext ac) {
alignmentContext = ac;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Unstable @Unstable
@ -1186,6 +1192,9 @@ public class Client implements AutoCloseable {
final Call call = calls.remove(callId); final Call call = calls.remove(callId);
call.setRpcResponse(value); call.setRpcResponse(value);
} }
if (alignmentContext != null) {
alignmentContext.receiveResponseState(header);
}
// verify that packet length was correct // verify that packet length was correct
if (packet.remaining() > 0) { if (packet.remaining() > 0) {
throw new RpcClientException("RPC response length mismatch"); throw new RpcClientException("RPC response length mismatch");

View File

@ -337,11 +337,11 @@ public class ProtobufRpcEngine implements RpcEngine {
String bindAddress, int port, int numHandlers, int numReaders, String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf, int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) String portRangeConfig, AlignmentContext alignmentContext)
throws IOException { throws IOException {
return new Server(protocol, protocolImpl, conf, bindAddress, port, return new Server(protocol, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig); portRangeConfig, alignmentContext);
} }
public static class Server extends RPC.Server { public static class Server extends RPC.Server {
@ -410,17 +410,18 @@ public class ProtobufRpcEngine implements RpcEngine {
* @param numHandlers the number of method handler threads to run * @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged * @param verbose whether each call should be logged
* @param portRangeConfig A config parameter that can be used to restrict * @param portRangeConfig A config parameter that can be used to restrict
* the range of ports used when port is 0 (an ephemeral port) * @param alignmentContext provides server state info on client responses
*/ */
public Server(Class<?> protocolClass, Object protocolImpl, public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port, int numHandlers, Configuration conf, String bindAddress, int port, int numHandlers,
int numReaders, int queueSizePerHandler, boolean verbose, int numReaders, int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) String portRangeConfig, AlignmentContext alignmentContext)
throws IOException { throws IOException {
super(bindAddress, port, null, numHandlers, super(bindAddress, port, null, numHandlers,
numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
.getClass().getName()), secretManager, portRangeConfig); .getClass().getName()), secretManager, portRangeConfig);
setAlignmentContext(alignmentContext);
this.verbose = verbose; this.verbose = verbose;
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl); protocolImpl);

View File

@ -717,6 +717,7 @@ public class RPC {
private final Configuration conf; private final Configuration conf;
private SecretManager<? extends TokenIdentifier> secretManager = null; private SecretManager<? extends TokenIdentifier> secretManager = null;
private String portRangeConfig = null; private String portRangeConfig = null;
private AlignmentContext alignmentContext = null;
public Builder(Configuration conf) { public Builder(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -783,6 +784,12 @@ public class RPC {
return this; return this;
} }
/** Default: null */
public Builder setAlignmentContext(AlignmentContext alignmentContext) {
this.alignmentContext = alignmentContext;
return this;
}
/** /**
* Build the RPC Server. * Build the RPC Server.
* @throws IOException on error * @throws IOException on error
@ -802,7 +809,8 @@ public class RPC {
return getProtocolEngine(this.protocol, this.conf).getServer( return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port, this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler, this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig); this.verbose, this.conf, this.secretManager, this.portRangeConfig,
this.alignmentContext);
} }
} }

View File

@ -67,6 +67,7 @@ public interface RpcEngine {
* @param secretManager The secret manager to use to validate incoming requests. * @param secretManager The secret manager to use to validate incoming requests.
* @param portRangeConfig A config parameter that can be used to restrict * @param portRangeConfig A config parameter that can be used to restrict
* the range of ports used when port is 0 (an ephemeral port) * the range of ports used when port is 0 (an ephemeral port)
* @param alignmentContext provides server state info on client responses
* @return The Server instance * @return The Server instance
* @throws IOException on any error * @throws IOException on any error
*/ */
@ -75,8 +76,8 @@ public interface RpcEngine {
int queueSizePerHandler, boolean verbose, int queueSizePerHandler, boolean verbose,
Configuration conf, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig String portRangeConfig,
) throws IOException; AlignmentContext alignmentContext) throws IOException;
/** /**
* Returns a proxy for ProtocolMetaInfoPB, which uses the given connection * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection

View File

@ -141,6 +141,12 @@ public abstract class Server {
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler(); private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
private Tracer tracer; private Tracer tracer;
private AlignmentContext alignmentContext;
/**
* Logical name of the server used in metrics and monitor.
*/
private final String serverName;
/** /**
* Add exception classes for which server won't log stack traces. * Add exception classes for which server won't log stack traces.
* *
@ -159,6 +165,15 @@ public abstract class Server {
exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass); exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass);
} }
/**
* Set alignment context to pass state info thru RPC.
*
* @param alignmentContext alignment state context
*/
public void setAlignmentContext(AlignmentContext alignmentContext) {
this.alignmentContext = alignmentContext;
}
/** /**
* ExceptionsHandler manages Exception groups for special handling * ExceptionsHandler manages Exception groups for special handling
* e.g., terse exception group for concise logging messages * e.g., terse exception group for concise logging messages
@ -2775,6 +2790,7 @@ public abstract class Server {
this.rpcRequestClass = rpcRequestClass; this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount; this.handlerCount = handlerCount;
this.socketSendBufferSize = 0; this.socketSendBufferSize = 0;
this.serverName = serverName;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) { if (queueSizePerHandler != -1) {
@ -2916,6 +2932,9 @@ public abstract class Server {
headerBuilder.setRetryCount(call.retryCount); headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status); headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
if(alignmentContext != null) {
alignmentContext.updateResponseState(headerBuilder);
}
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build(); RpcResponseHeaderProto header = headerBuilder.build();

View File

@ -323,11 +323,11 @@ public class WritableRpcEngine implements RpcEngine {
int numHandlers, int numReaders, int queueSizePerHandler, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) String portRangeConfig, AlignmentContext alignmentContext)
throws IOException { throws IOException {
return new Server(protocolClass, protocolImpl, conf, bindAddress, port, return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig); portRangeConfig, alignmentContext);
} }
@ -397,18 +397,45 @@ public class WritableRpcEngine implements RpcEngine {
* @param port the port to listen for connections on * @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run * @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged * @param verbose whether each call should be logged
*
* @deprecated use Server#Server(Class, Object,
* Configuration, String, int, int, int, int, boolean, SecretManager)
*/ */
@Deprecated
public Server(Class<?> protocolClass, Object protocolImpl, public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port, Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) String portRangeConfig)
throws IOException { throws IOException {
this(null, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose,
secretManager, null, null);
}
/**
* Construct an RPC server.
* @param protocolClass - the protocol being registered
* can be null for compatibility with old usage (see below for details)
* @param protocolImpl the protocol impl that will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
* @param alignmentContext provides server state info on client responses
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
super(bindAddress, port, null, numHandlers, numReaders, super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf, queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager, classNameBase(protocolImpl.getClass().getName()), secretManager,
portRangeConfig); portRangeConfig);
setAlignmentContext(alignmentContext);
this.verbose = verbose; this.verbose = verbose;

View File

@ -155,6 +155,7 @@ message RpcResponseHeaderProto {
optional RpcErrorCodeProto errorDetail = 6; // in case of error optional RpcErrorCodeProto errorDetail = 6; // in case of error
optional bytes clientId = 7; // Globally unique client ID optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1]; optional sint32 retryCount = 8 [default = -1];
optional int64 stateId = 9; // The last written Global State ID
} }
message RpcSaslProto { message RpcSaslProto {

View File

@ -301,7 +301,8 @@ public class TestRPC extends TestRpcBase {
int numHandlers, int numReaders, int queueSizePerHandler, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) throws IOException { String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
return null; return null;
} }

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import java.util.concurrent.atomic.LongAccumulator;
/**
* This is the client side implementation responsible for receiving
* state alignment info from server(s).
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
class ClientGCIContext implements AlignmentContext {
private final DFSClient dfsClient;
private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
/**
* Client side constructor.
* @param dfsClient client side state receiver
*/
ClientGCIContext(DFSClient dfsClient) {
this.dfsClient = dfsClient;
}
/**
* Client side implementation only receives state alignment info.
* It does not provide state alignment info therefore this does nothing.
*/
@Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) {
// Do nothing.
}
/**
* Client side implementation for receiving state alignment info.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
lastSeenStateId.accumulate(header.getStateId());
dfsClient.lastSeenStateId = lastSeenStateId.get();
}
}

View File

@ -165,6 +165,7 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
@ -218,6 +219,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final UserGroupInformation ugi; final UserGroupInformation ugi;
volatile boolean clientRunning = true; volatile boolean clientRunning = true;
volatile long lastLeaseRenewal; volatile long lastLeaseRenewal;
volatile long lastSeenStateId;
private volatile FsServerDefaults serverDefaults; private volatile FsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate; private volatile long serverDefaultsLastUpdate;
final String clientName; final String clientName;
@ -395,6 +397,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.saslClient = new SaslDataTransferClient( this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
Client.setAlignmentContext(new ClientGCIContext(this));
} }
/** /**

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
* This is the server side implementation responsible for passing
* state alignment info to clients.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
class GlobalStateIdContext implements AlignmentContext {
private final FSNamesystem namesystem;
/**
* Server side constructor.
* @param namesystem server side state provider
*/
GlobalStateIdContext(FSNamesystem namesystem) {
this.namesystem = namesystem;
}
/**
* Server side implementation for providing state alignment info.
*/
@Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) {
header.setStateId(namesystem.getLastWrittenTransactionId());
}
/**
* Server side implementation only provides state alignment info.
* It does not receive state alignment info therefore this does nothing.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
// Do nothing.
}
}

View File

@ -455,6 +455,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
.setNumHandlers(handlerCount) .setNumHandlers(handlerCount)
.setVerbose(false) .setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager()) .setSecretManager(namesystem.getDelegationTokenSecretManager())
.setAlignmentContext(new GlobalStateIdContext(namesystem))
.build(); .build();
// Add all the RPC protocols that the namenode implements // Add all the RPC protocols that the namenode implements

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
/**
* Class is used to test server sending state alignment information to clients
* via RPC and likewise clients receiving and updating their last known
* state alignment info.
* These tests check that after a single RPC call a client will have caught up
* to the most recent alignment state of the server.
*/
public class TestStateAlignmentContext {
static final long BLOCK_SIZE = 64 * 1024;
private static final int NUMDATANODES = 3;
private static final Configuration CONF = new HdfsConfiguration();
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@BeforeClass
public static void startUpCluster() throws IOException {
// disable block scanner
CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
.build();
cluster.waitActive();
}
@Before
public void before() throws IOException {
dfs = cluster.getFileSystem();
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (dfs != null) {
dfs.close();
dfs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@After
public void after() throws IOException {
dfs.close();
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWrite() throws Exception {
long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
long clientState = dfs.dfs.lastSeenStateId;
long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
}
/**
* This test checks if after a client reads we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnRead() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
}
/**
* This test checks that a fresh client starts with no state and becomes
* updated of state from RPC call.
*/
@Test
public void testStateTransferOnFreshClient() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
}
}
}