HDFS-12977. [SBN read] Add stateId to RPC headers. Contributed by Plamen Jeliazkov.

This commit is contained in:
Plamen Jeliazkov 2018-03-20 18:48:40 -07:00 committed by Chen Liang
parent 3932ac4ef7
commit 1eeca2d9fb
14 changed files with 383 additions and 12 deletions

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
* This interface intends to align the state between client and server
* via RPC communication.
*
* This should be implemented separately on the client side and server side
* and can be used to pass state information on RPC responses from server
* to client.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public interface AlignmentContext {
/**
* This is the intended server method call to implement to pass state info
* during RPC response header construction.
* @param header The RPC response header builder.
*/
void updateResponseState(RpcResponseHeaderProto.Builder header);
/**
* This is the intended client method call to implement to recieve state info
* during RPC response processing.
* @param header The RPC response header.
*/
void receiveResponseState(RpcResponseHeaderProto header);
}

View File

@ -103,6 +103,12 @@ public class Client implements AutoCloseable {
return false;
}
};
private static AlignmentContext alignmentContext;
/** Set alignment context to use to fetch state alignment info from RPC. */
public static void setAlignmentContext(AlignmentContext ac) {
alignmentContext = ac;
}
@SuppressWarnings("unchecked")
@Unstable
@ -1186,6 +1192,9 @@ public class Client implements AutoCloseable {
final Call call = calls.remove(callId);
call.setRpcResponse(value);
}
if (alignmentContext != null) {
alignmentContext.receiveResponseState(header);
}
// verify that packet length was correct
if (packet.remaining() > 0) {
throw new RpcClientException("RPC response length mismatch");

View File

@ -337,11 +337,11 @@ public class ProtobufRpcEngine implements RpcEngine {
String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
return new Server(protocol, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
portRangeConfig, alignmentContext);
}
public static class Server extends RPC.Server {
@ -410,18 +410,19 @@ public class ProtobufRpcEngine implements RpcEngine {
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
* @param portRangeConfig A config parameter that can be used to restrict
* the range of ports used when port is 0 (an ephemeral port)
* @param alignmentContext provides server state info on client responses
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port, int numHandlers,
int numReaders, int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
super(bindAddress, port, null, numHandlers,
numReaders, queueSizePerHandler, conf,
serverNameFromClass(protocolImpl.getClass()), secretManager,
portRangeConfig);
setAlignmentContext(alignmentContext);
this.verbose = verbose;
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);

View File

@ -719,6 +719,7 @@ public class RPC {
private final Configuration conf;
private SecretManager<? extends TokenIdentifier> secretManager = null;
private String portRangeConfig = null;
private AlignmentContext alignmentContext = null;
public Builder(Configuration conf) {
this.conf = conf;
@ -785,6 +786,12 @@ public class RPC {
return this;
}
/** Default: null */
public Builder setAlignmentContext(AlignmentContext alignmentContext) {
this.alignmentContext = alignmentContext;
return this;
}
/**
* Build the RPC Server.
* @throws IOException on error
@ -804,7 +811,8 @@ public class RPC {
return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
this.verbose, this.conf, this.secretManager, this.portRangeConfig,
this.alignmentContext);
}
}

View File

@ -67,6 +67,7 @@ public interface RpcEngine {
* @param secretManager The secret manager to use to validate incoming requests.
* @param portRangeConfig A config parameter that can be used to restrict
* the range of ports used when port is 0 (an ephemeral port)
* @param alignmentContext provides server state info on client responses
* @return The Server instance
* @throws IOException on any error
*/
@ -75,8 +76,8 @@ public interface RpcEngine {
int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig
) throws IOException;
String portRangeConfig,
AlignmentContext alignmentContext) throws IOException;
/**
* Returns a proxy for ProtocolMetaInfoPB, which uses the given connection

View File

@ -140,6 +140,7 @@ public abstract class Server {
private RpcSaslProto negotiateResponse;
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
private Tracer tracer;
private AlignmentContext alignmentContext;
/**
* Logical name of the server used in metrics and monitor.
*/
@ -163,6 +164,15 @@ public abstract class Server {
exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass);
}
/**
* Set alignment context to pass state info thru RPC.
*
* @param alignmentContext alignment state context
*/
public void setAlignmentContext(AlignmentContext alignmentContext) {
this.alignmentContext = alignmentContext;
}
/**
* ExceptionsHandler manages Exception groups for special handling
* e.g., terse exception group for concise logging messages
@ -2921,6 +2931,9 @@ public abstract class Server {
headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
if(alignmentContext != null) {
alignmentContext.updateResponseState(headerBuilder);
}
if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build();

View File

@ -323,11 +323,11 @@ public class WritableRpcEngine implements RpcEngine {
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
portRangeConfig, alignmentContext);
}
@ -397,18 +397,45 @@ public class WritableRpcEngine implements RpcEngine {
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*
* @deprecated use Server#Server(Class, Object,
* Configuration, String, int, int, int, int, boolean, SecretManager)
*/
@Deprecated
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
this(null, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose,
secretManager, null, null);
}
/**
* Construct an RPC server.
* @param protocolClass - the protocol being registered
* can be null for compatibility with old usage (see below for details)
* @param protocolImpl the protocol impl that will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
* @param alignmentContext provides server state info on client responses
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
serverNameFromClass(protocolImpl.getClass()), secretManager,
portRangeConfig);
setAlignmentContext(alignmentContext);
this.verbose = verbose;

View File

@ -155,6 +155,7 @@ message RpcResponseHeaderProto {
optional RpcErrorCodeProto errorDetail = 6; // in case of error
optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1];
optional int64 stateId = 9; // The last written Global State ID
}
message RpcSaslProto {

View File

@ -300,7 +300,8 @@ public class TestRPC extends TestRpcBase {
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) throws IOException {
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
return null;
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import java.util.concurrent.atomic.LongAccumulator;
/**
* This is the client side implementation responsible for receiving
* state alignment info from server(s).
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
class ClientGCIContext implements AlignmentContext {
private final DFSClient dfsClient;
private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
/**
* Client side constructor.
* @param dfsClient client side state receiver
*/
ClientGCIContext(DFSClient dfsClient) {
this.dfsClient = dfsClient;
}
/**
* Client side implementation only receives state alignment info.
* It does not provide state alignment info therefore this does nothing.
*/
@Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) {
// Do nothing.
}
/**
* Client side implementation for receiving state alignment info.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
lastSeenStateId.accumulate(header.getStateId());
dfsClient.lastSeenStateId = lastSeenStateId.get();
}
}

View File

@ -166,6 +166,7 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
@ -219,6 +220,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final UserGroupInformation ugi;
volatile boolean clientRunning = true;
volatile long lastLeaseRenewal;
volatile long lastSeenStateId;
private volatile FsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate;
final String clientName;
@ -396,6 +398,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
Client.setAlignmentContext(new ClientGCIContext(this));
}
/**

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
* This is the server side implementation responsible for passing
* state alignment info to clients.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
class GlobalStateIdContext implements AlignmentContext {
private final FSNamesystem namesystem;
/**
* Server side constructor.
* @param namesystem server side state provider
*/
GlobalStateIdContext(FSNamesystem namesystem) {
this.namesystem = namesystem;
}
/**
* Server side implementation for providing state alignment info.
*/
@Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) {
header.setStateId(namesystem.getLastWrittenTransactionId());
}
/**
* Server side implementation only provides state alignment info.
* It does not receive state alignment info therefore this does nothing.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
// Do nothing.
}
}

View File

@ -455,6 +455,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.setAlignmentContext(new GlobalStateIdContext(namesystem))
.build();
// Add all the RPC protocols that the namenode implements

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
/**
* Class is used to test server sending state alignment information to clients
* via RPC and likewise clients receiving and updating their last known
* state alignment info.
* These tests check that after a single RPC call a client will have caught up
* to the most recent alignment state of the server.
*/
public class TestStateAlignmentContext {
static final long BLOCK_SIZE = 64 * 1024;
private static final int NUMDATANODES = 3;
private static final Configuration CONF = new HdfsConfiguration();
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@BeforeClass
public static void startUpCluster() throws IOException {
// disable block scanner
CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
.build();
cluster.waitActive();
}
@Before
public void before() throws IOException {
dfs = cluster.getFileSystem();
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (dfs != null) {
dfs.close();
dfs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@After
public void after() throws IOException {
dfs.close();
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWrite() throws Exception {
long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
long clientState = dfs.dfs.lastSeenStateId;
long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
}
/**
* This test checks if after a client reads we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnRead() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
}
/**
* This test checks that a fresh client starts with no state and becomes
* updated of state from RPC call.
*/
@Test
public void testStateTransferOnFreshClient() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
}
}
}