HDFS-2586. Add protobuf service and implementation for HAServiceProtocol. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1245338 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-02-17 07:37:43 +00:00
parent 1a03127385
commit 7933dc5838
9 changed files with 493 additions and 11 deletions

View File

@ -31,6 +31,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
@ -239,9 +240,7 @@ public abstract class HAAdmin extends Configured implements Tool {
throws IOException {
String serviceAddr = getServiceAddr(serviceId);
InetSocketAddress addr = NetUtils.createSocketAddr(serviceAddr);
return (HAServiceProtocol)RPC.getProxy(
HAServiceProtocol.class, HAServiceProtocol.versionID,
addr, getConf());
return new HAServiceProtocolClientSideTranslatorPB(addr, getConf());
}
@Override

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStateRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.ReadyToBecomeActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link HAServiceProtocol} interfaces to the RPC server implementing
* {@link HAServiceProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class HAServiceProtocolClientSideTranslatorPB implements
HAServiceProtocol, Closeable {
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ =
MonitorHealthRequestProto.newBuilder().build();
private final static TransitionToActiveRequestProto TRANSITION_TO_ACTIVE_REQ =
TransitionToActiveRequestProto.newBuilder().build();
private final static TransitionToStandbyRequestProto TRANSITION_TO_STANDBY_REQ =
TransitionToStandbyRequestProto.newBuilder().build();
private final static GetServiceStateRequestProto GET_SERVICE_STATE_REQ =
GetServiceStateRequestProto.newBuilder().build();
private final static ReadyToBecomeActiveRequestProto ACTIVE_READY_REQ =
ReadyToBecomeActiveRequestProto.newBuilder().build();
private final HAServiceProtocolPB rpcProxy;
public HAServiceProtocolClientSideTranslatorPB(InetSocketAddress addr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, conf);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return rpcProxy.getProtocolVersion(protocol, clientVersion);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return rpcProxy.getProtocolSignature(protocol, clientVersion,
clientMethodsHash);
}
@Override
public void monitorHealth() throws IOException {
try {
rpcProxy.monitorHealth(NULL_CONTROLLER, MONITOR_HEALTH_REQ);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void transitionToActive() throws IOException {
try {
rpcProxy.transitionToActive(NULL_CONTROLLER, TRANSITION_TO_ACTIVE_REQ);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void transitionToStandby() throws IOException {
try {
rpcProxy.transitionToStandby(NULL_CONTROLLER, TRANSITION_TO_STANDBY_REQ);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public HAServiceState getServiceState() throws IOException {
HAServiceStateProto state;
try {
state = rpcProxy.getServiceState(NULL_CONTROLLER,
GET_SERVICE_STATE_REQ).getState();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
switch(state) {
case ACTIVE:
return HAServiceState.ACTIVE;
case STANDBY:
return HAServiceState.STANDBY;
case INITIALIZING:
default:
return HAServiceState.INITIALIZING;
}
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
@Override
public boolean readyToBecomeActive() throws IOException {
try {
return rpcProxy.readyToBecomeActive(NULL_CONTROLLER, ACTIVE_READY_REQ)
.getReadyToBecomeActive();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(protocolName = "org.apache.hadoop.ha.HAServiceProtocol",
protocolVersion = 1)
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface HAServiceProtocolPB extends
HAServiceProtocolService.BlockingInterface, VersionedProtocol {
/**
* If any methods need annotation, it can be added here
*/
}

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha.protocolPB;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStateRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStateResponseProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthResponseProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.ReadyToBecomeActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.ReadyToBecomeActiveResponseProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveResponseProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyResponseProto;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is used on the server side. Calls come across the wire for the
* for protocol {@link HAServiceProtocolPB}.
* This class translates the PB data types
* to the native data types used inside the NN as specified in the generic
* ClientProtocol.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class HAServiceProtocolServerSideTranslatorPB implements
HAServiceProtocolPB {
private final HAServiceProtocol server;
private static final MonitorHealthResponseProto MONITOR_HEALTH_RESP =
MonitorHealthResponseProto.newBuilder().build();
private static final TransitionToActiveResponseProto TRANSITION_TO_ACTIVE_RESP =
TransitionToActiveResponseProto.newBuilder().build();
private static final TransitionToStandbyResponseProto TRANSITION_TO_STANDBY_RESP =
TransitionToStandbyResponseProto.newBuilder().build();
public HAServiceProtocolServerSideTranslatorPB(HAServiceProtocol server) {
this.server = server;
}
@Override
public MonitorHealthResponseProto monitorHealth(RpcController controller,
MonitorHealthRequestProto request) throws ServiceException {
try {
server.monitorHealth();
return MONITOR_HEALTH_RESP;
} catch(IOException e) {
throw new ServiceException(e);
}
}
@Override
public TransitionToActiveResponseProto transitionToActive(
RpcController controller, TransitionToActiveRequestProto request)
throws ServiceException {
try {
server.transitionToActive();
return TRANSITION_TO_ACTIVE_RESP;
} catch(IOException e) {
throw new ServiceException(e);
}
}
@Override
public TransitionToStandbyResponseProto transitionToStandby(
RpcController controller, TransitionToStandbyRequestProto request)
throws ServiceException {
try {
server.transitionToStandby();
return TRANSITION_TO_STANDBY_RESP;
} catch(IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetServiceStateResponseProto getServiceState(RpcController controller,
GetServiceStateRequestProto request) throws ServiceException {
HAServiceState s;
try {
s = server.getServiceState();
} catch(IOException e) {
throw new ServiceException(e);
}
HAServiceStateProto ret;
switch (s) {
case ACTIVE:
ret = HAServiceStateProto.ACTIVE;
break;
case STANDBY:
ret = HAServiceStateProto.STANDBY;
break;
case INITIALIZING:
default:
ret = HAServiceStateProto.INITIALIZING;
break;
}
return GetServiceStateResponseProto.newBuilder().setState(ret).build();
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return RPC.getProtocolVersion(HAServiceProtocolPB.class);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
if (!protocol.equals(RPC.getProtocolName(HAServiceProtocolPB.class))) {
throw new IOException("Serverside implements " +
RPC.getProtocolName(HAServiceProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(HAServiceProtocolPB.class),
HAServiceProtocolPB.class);
}
@Override
public ReadyToBecomeActiveResponseProto readyToBecomeActive(
RpcController controller, ReadyToBecomeActiveRequestProto request)
throws ServiceException {
try {
return ReadyToBecomeActiveResponseProto.newBuilder()
.setReadyToBecomeActive(server.readyToBecomeActive()).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ha.proto";
option java_outer_classname = "HAServiceProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
enum HAServiceStateProto {
INITIALIZING = 0;
ACTIVE = 1;
STANDBY = 2;
}
/**
* void request
*/
message MonitorHealthRequestProto {
}
/**
* void response
*/
message MonitorHealthResponseProto {
}
/**
* void request
*/
message TransitionToActiveRequestProto {
}
/**
* void response
*/
message TransitionToActiveResponseProto {
}
/**
* void request
*/
message TransitionToStandbyRequestProto {
}
/**
* void response
*/
message TransitionToStandbyResponseProto {
}
/**
* void request
*/
message GetServiceStateRequestProto {
}
/**
* Returns the state of the service
*/
message GetServiceStateResponseProto {
required HAServiceStateProto state = 1;
}
/**
* void request
*/
message ReadyToBecomeActiveRequestProto {
}
/**
* Returns true if service is ready to become active
*/
message ReadyToBecomeActiveResponseProto {
required bool readyToBecomeActive = 1;
}
/**
* Protocol interface provides High availability related
* primitives to monitor and failover a service.
*
* For details see o.a.h.ha.HAServiceProtocol.
*/
service HAServiceProtocolService {
/**
* Monitor the health of a service.
*/
rpc monitorHealth(MonitorHealthRequestProto)
returns(MonitorHealthResponseProto);
/**
* Request service to tranisition to active state.
*/
rpc transitionToActive(TransitionToActiveRequestProto)
returns(TransitionToActiveResponseProto);
/**
* Request service to transition to standby state.
*/
rpc transitionToStandby(TransitionToStandbyRequestProto)
returns(TransitionToStandbyResponseProto);
/**
* Get the current state of the service.
*/
rpc getServiceState(GetServiceStateRequestProto)
returns(GetServiceStateResponseProto);
/**
* Check if the service is ready to become active
*/
rpc readyToBecomeActive(ReadyToBecomeActiveRequestProto)
returns(ReadyToBecomeActiveResponseProto);
}

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
import org.apache.hadoop.ha.TestNodeFencer.AlwaysFailFencer;
import static org.apache.hadoop.ha.TestNodeFencer.setupFencer;
@ -285,8 +286,7 @@ public class TestFailoverController {
Configuration conf = new Configuration();
// Lower the timeout so we quickly fail to connect
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
return (HAServiceProtocol)RPC.getProxy(
HAServiceProtocol.class, HAServiceProtocol.versionID, addr, conf);
return new HAServiceProtocolClientSideTranslatorPB(addr, conf);
}
@Test

View File

@ -214,3 +214,5 @@ HDFS-2928. ConfiguredFailoverProxyProvider should not create a NameNode proxy wi
HDFS-2955. IllegalStateException during standby startup in getCurSegmentTxId. (Hari Mankude via atm)
HDFS-2937. TestDFSHAAdmin needs tests with MiniDFSCluster. (Brandon Li via suresh)
HDFS-2586. Add protobuf service and implementation for HAServiceProtocol. (suresh via atm)

View File

@ -42,6 +42,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
@ -194,6 +197,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
new GetUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService getUserMappingService = GetUserMappingsProtocolService
.newReflectiveBlockingService(getUserMappingXlator);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
WritableRpcEngine.ensureInitialized();
@ -209,8 +217,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
HAServiceProtocol.class, this);
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
@ -234,8 +242,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
clientNNPbService, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf,
namesystem.getDelegationTokenSecretManager());
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
HAServiceProtocol.class, this);
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,

View File

@ -49,6 +49,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocolHelper;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -1590,8 +1591,7 @@ public class MiniDFSCluster {
private HAServiceProtocol getHaServiceClient(int nnIndex) throws IOException {
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
return RPC.getProxy(HAServiceProtocol.class,
HAServiceProtocol.versionID, addr, conf);
return new HAServiceProtocolClientSideTranslatorPB(addr, conf);
}
public void transitionToActive(int nnIndex) throws IOException,