HADOOP-10376. Refactor refresh*Protocols into a single generic refreshConfigProtocol. (Contributed by Chris Li)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1602055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-06-12 01:27:39 +00:00
parent 2584d531e1
commit 34e9173c00
17 changed files with 938 additions and 2 deletions

View File

@ -417,6 +417,9 @@ Release 2.5.0 - UNRELEASED
TCP RST and miss session expiration event due to bug in client connection TCP RST and miss session expiration event due to bug in client connection
management. (cnauroth) management. (cnauroth)
HADOOP-10376. Refactor refresh*Protocols into a single generic
refreshConfigProtocol. (Chris Li via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -287,6 +287,10 @@
<!-- protobuf generated code --> <!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/> <Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/>
</Match> </Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
</Match>
<!-- <!--
Manually checked, misses child thread manually syncing on parent's intrinsic lock. Manually checked, misses child thread manually syncing on parent's intrinsic lock.

View File

@ -318,6 +318,7 @@
<include>RefreshAuthorizationPolicyProtocol.proto</include> <include>RefreshAuthorizationPolicyProtocol.proto</include>
<include>RefreshUserMappingsProtocol.proto</include> <include>RefreshUserMappingsProtocol.proto</include>
<include>RefreshCallQueueProtocol.proto</include> <include>RefreshCallQueueProtocol.proto</include>
<include>GenericRefreshProtocol.proto</include>
</includes> </includes>
</source> </source>
<output>${project.build.directory}/generated-sources/java</output> <output>${project.build.directory}/generated-sources/java</output>

View File

@ -143,6 +143,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE = HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE =
"security.refresh.callqueue.protocol.acl"; "security.refresh.callqueue.protocol.acl";
public static final String public static final String
HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH =
"security.refresh.generic.protocol.acl";
public static final String
SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl"; SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
public static final String public static final String
SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl"; SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl";

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol which is used to refresh arbitrary things at runtime.
*/
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface GenericRefreshProtocol {
/**
* Version 1: Initial version.
*/
public static final long versionID = 1L;
/**
* Refresh the resource based on identity passed in.
* @throws IOException
*/
@Idempotent
Collection<RefreshResponse> refresh(String identifier, String[] args)
throws IOException;
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Used to registry custom methods to refresh at runtime.
*/
@InterfaceStability.Unstable
public interface RefreshHandler {
/**
* Implement this method to accept refresh requests from the administrator.
* @param identifier is the identifier you registered earlier
* @param args contains a list of string args from the administrator
* @throws Exception as a shorthand for a RefreshResponse(-1, message)
* @return a RefreshResponse
*/
RefreshResponse handleRefresh(String identifier, String[] args);
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.util.ArrayList;
import java.util.Collection;
import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Used to registry custom methods to refresh at runtime.
* Each identifier maps to one or more RefreshHandlers.
*/
@InterfaceStability.Unstable
public class RefreshRegistry {
public static final Log LOG = LogFactory.getLog(RefreshRegistry.class);
// Used to hold singleton instance
private static class RegistryHolder {
@SuppressWarnings("All")
public static RefreshRegistry registry = new RefreshRegistry();
}
// Singleton access
public static RefreshRegistry defaultRegistry() {
return RegistryHolder.registry;
}
private final Multimap<String, RefreshHandler> handlerTable;
public RefreshRegistry() {
handlerTable = HashMultimap.create();
}
/**
* Registers an object as a handler for a given identity.
* Note: will prevent handler from being GC'd, object should unregister itself
* when done
* @param identifier a unique identifier for this resource,
* such as org.apache.hadoop.blacklist
* @param handler the object to register
*/
public synchronized void register(String identifier, RefreshHandler handler) {
if (identifier == null) {
throw new NullPointerException("Identifier cannot be null");
}
handlerTable.put(identifier, handler);
}
/**
* Remove the registered object for a given identity.
* @param identifier the resource to unregister
* @return the true if removed
*/
public synchronized boolean unregister(String identifier, RefreshHandler handler) {
return handlerTable.remove(identifier, handler);
}
public synchronized void unregisterAll(String identifier) {
handlerTable.removeAll(identifier);
}
/**
* Lookup the responsible handler and return its result.
* This should be called by the RPC server when it gets a refresh request.
* @param identifier the resource to refresh
* @param args the arguments to pass on, not including the program name
* @throws IllegalArgumentException on invalid identifier
* @return the response from the appropriate handler
*/
public synchronized Collection<RefreshResponse> dispatch(String identifier, String[] args) {
Collection<RefreshHandler> handlers = handlerTable.get(identifier);
if (handlers.size() == 0) {
String msg = "Identifier '" + identifier +
"' does not exist in RefreshRegistry. Valid options are: " +
Joiner.on(", ").join(handlerTable.keySet());
throw new IllegalArgumentException(msg);
}
ArrayList<RefreshResponse> responses =
new ArrayList<RefreshResponse>(handlers.size());
// Dispatch to each handler and store response
for(RefreshHandler handler : handlers) {
RefreshResponse response;
// Run the handler
try {
response = handler.handleRefresh(identifier, args);
if (response == null) {
throw new NullPointerException("Handler returned null.");
}
LOG.info(handlerName(handler) + " responds to '" + identifier +
"', says: '" + response.getMessage() + "', returns " +
response.getReturnCode());
} catch (Exception e) {
response = new RefreshResponse(-1, e.getLocalizedMessage());
}
response.setSenderName(handlerName(handler));
responses.add(response);
}
return responses;
}
private String handlerName(RefreshHandler h) {
return h.getClass().getName() + '@' + Integer.toHexString(h.hashCode());
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Return a response in the handler method for the user to see.
* Useful since you may want to display status to a user even though an
* error has not occurred.
*/
@InterfaceStability.Unstable
public class RefreshResponse {
private int returnCode = -1;
private String message;
private String senderName;
/**
* Convenience method to create a response for successful refreshes.
* @return void response
*/
public static RefreshResponse successResponse() {
return new RefreshResponse(0, "Success");
}
// Most RefreshHandlers will use this
public RefreshResponse(int returnCode, String message) {
this.returnCode = returnCode;
this.message = message;
}
/**
* Optionally set the sender of this RefreshResponse.
* This helps clarify things when multiple handlers respond.
* @param name The name of the sender
*/
public void setSenderName(String name) {
senderName = name;
}
public String getSenderName() { return senderName; }
public int getReturnCode() { return returnCode; }
public void setReturnCode(int rc) { returnCode = rc; }
public void setMessage(String m) { message = m; }
public String getMessage() { return message; }
@Override
public String toString() {
String ret = "";
if (senderName != null) {
ret += senderName + ": ";
}
if (message != null) {
ret += message;
}
ret += " (exit " + returnCode + ")";
return ret;
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshRequestProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseCollectionProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class GenericRefreshProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GenericRefreshProtocol, Closeable {
/** RpcController is not used and hence is set to null. */
private final static RpcController NULL_CONTROLLER = null;
private final GenericRefreshProtocolPB rpcProxy;
public GenericRefreshProtocolClientSideTranslatorPB(
GenericRefreshProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public Collection<RefreshResponse> refresh(String identifier, String[] args) throws IOException {
List<String> argList = Arrays.asList(args);
try {
GenericRefreshRequestProto request = GenericRefreshRequestProto.newBuilder()
.setIdentifier(identifier)
.addAllArgs(argList)
.build();
GenericRefreshResponseCollectionProto resp = rpcProxy.refresh(NULL_CONTROLLER, request);
return unpack(resp);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
private Collection<RefreshResponse> unpack(GenericRefreshResponseCollectionProto collection) {
List<GenericRefreshResponseProto> responseProtos = collection.getResponsesList();
List<RefreshResponse> responses = new ArrayList<RefreshResponse>();
for (GenericRefreshResponseProto rp : responseProtos) {
RefreshResponse response = unpack(rp);
responses.add(response);
}
return responses;
}
private RefreshResponse unpack(GenericRefreshResponseProto proto) {
// The default values
String message = null;
String sender = null;
int returnCode = -1;
// ... that can be overridden by data from the protobuf
if (proto.hasUserMessage()) {
message = proto.getUserMessage();
}
if (proto.hasExitStatus()) {
returnCode = proto.getExitStatus();
}
if (proto.hasSenderName()) {
sender = proto.getSenderName();
}
// ... and put into a RefreshResponse
RefreshResponse response = new RefreshResponse(returnCode, message);
response.setSenderName(sender);
return response;
}
@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,
GenericRefreshProtocolPB.class,
RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(GenericRefreshProtocolPB.class),
methodName);
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.ipc.GenericRefreshProtocol",
protocolVersion = 1)
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Evolving
public interface GenericRefreshProtocolPB extends
GenericRefreshProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc.protocolPB;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshRequestProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseCollectionProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class GenericRefreshProtocolServerSideTranslatorPB implements
GenericRefreshProtocolPB {
private final GenericRefreshProtocol impl;
public GenericRefreshProtocolServerSideTranslatorPB(
GenericRefreshProtocol impl) {
this.impl = impl;
}
@Override
public GenericRefreshResponseCollectionProto refresh(
RpcController controller, GenericRefreshRequestProto request)
throws ServiceException {
try {
List<String> argList = request.getArgsList();
String[] args = argList.toArray(new String[argList.size()]);
if (!request.hasIdentifier()) {
throw new ServiceException("Request must contain identifier");
}
Collection<RefreshResponse> results = impl.refresh(request.getIdentifier(), args);
return pack(results);
} catch (IOException e) {
throw new ServiceException(e);
}
}
// Convert a collection of RefreshResponse objects to a
// RefreshResponseCollection proto
private GenericRefreshResponseCollectionProto pack(
Collection<RefreshResponse> responses) {
GenericRefreshResponseCollectionProto.Builder b =
GenericRefreshResponseCollectionProto.newBuilder();
for (RefreshResponse response : responses) {
GenericRefreshResponseProto.Builder respBuilder =
GenericRefreshResponseProto.newBuilder();
respBuilder.setExitStatus(response.getReturnCode());
respBuilder.setUserMessage(response.getMessage());
respBuilder.setSenderName(response.getSenderName());
// Add to collection
b.addResponses(respBuilder);
}
return b.build();
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *stable* .proto interface.
*/
option java_package = "org.apache.hadoop.ipc.proto";
option java_outer_classname = "GenericRefreshProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.common;
/**
* Refresh request.
*/
message GenericRefreshRequestProto {
optional string identifier = 1;
repeated string args = 2;
}
/**
* A single response from a refresh handler.
*/
message GenericRefreshResponseProto {
optional int32 exitStatus = 1; // unix exit status to return
optional string userMessage = 2; // to be displayed to the user
optional string senderName = 3; // which handler sent this message
}
/**
* Collection of responses from zero or more handlers.
*/
message GenericRefreshResponseCollectionProto {
repeated GenericRefreshResponseProto responses = 1;
}
/**
* Protocol which is used to refresh a user-specified feature.
*/
service GenericRefreshProtocolService {
rpc refresh(GenericRefreshRequestProto)
returns(GenericRefreshResponseCollectionProto);
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
/** /**
* {@link PolicyProvider} for HDFS protocols. * {@link PolicyProvider} for HDFS protocols.
@ -68,7 +69,10 @@ public class HDFSPolicyProvider extends PolicyProvider {
GetUserMappingsProtocol.class), GetUserMappingsProtocol.class),
new Service( new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE, CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE,
RefreshCallQueueProtocol.class) RefreshCallQueueProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH,
GenericRefreshProtocol.class)
}; };
@Override @Override

View File

@ -132,6 +132,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
@ -147,6 +149,9 @@ import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSi
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB; import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService; import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
@ -229,6 +234,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
BlockingService refreshCallQueueService = RefreshCallQueueProtocolService BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
.newReflectiveBlockingService(refreshCallQueueXlator); .newReflectiveBlockingService(refreshCallQueueXlator);
GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
new GenericRefreshProtocolServerSideTranslatorPB(this);
BlockingService genericRefreshService = GenericRefreshProtocolService
.newReflectiveBlockingService(genericRefreshXlator);
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
new GetUserMappingsProtocolServerSideTranslatorPB(this); new GetUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService getUserMappingService = GetUserMappingsProtocolService BlockingService getUserMappingService = GetUserMappingsProtocolService
@ -278,6 +288,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
// We support Refreshing call queue here in case the client RPC queue is full // We support Refreshing call queue here in case the client RPC queue is full
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, serviceRpcServer); refreshCallQueueService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer); getUserMappingService, serviceRpcServer);
@ -322,6 +334,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
refreshUserMappingService, clientRpcServer); refreshUserMappingService, clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, clientRpcServer); refreshCallQueueService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, clientRpcServer); getUserMappingService, clientRpcServer);
@ -1155,6 +1169,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
} }
@Override // GenericRefreshProtocol
public Collection<RefreshResponse> refresh(String identifier, String[] args) {
// Let the registry handle as needed
return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
}
@Override // GetUserMappingsProtocol @Override // GetUserMappingsProtocol
public String[] getGroupsForUser(String user) throws IOException { public String[] getGroupsForUser(String user) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol;
/** The full set of RPC methods implemented by the Namenode. */ /** The full set of RPC methods implemented by the Namenode. */
@ -35,6 +36,7 @@ public interface NamenodeProtocols
RefreshAuthorizationPolicyProtocol, RefreshAuthorizationPolicyProtocol,
RefreshUserMappingsProtocol, RefreshUserMappingsProtocol,
RefreshCallQueueProtocol, RefreshCallQueueProtocol,
GenericRefreshProtocol,
GetUserMappingsProtocol, GetUserMappingsProtocol,
HAServiceProtocol { HAServiceProtocol {
} }

View File

@ -26,6 +26,7 @@ import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -62,12 +63,17 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -688,6 +694,7 @@ public class DFSAdmin extends FsShell {
"\t[-refreshUserToGroupsMappings]\n" + "\t[-refreshUserToGroupsMappings]\n" +
"\t[-refreshSuperUserGroupsConfiguration]\n" + "\t[-refreshSuperUserGroupsConfiguration]\n" +
"\t[-refreshCallQueue]\n" + "\t[-refreshCallQueue]\n" +
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
"\t[-printTopology]\n" + "\t[-printTopology]\n" +
"\t[-refreshNamenodes datanodehost:port]\n"+ "\t[-refreshNamenodes datanodehost:port]\n"+
"\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+ "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
@ -764,6 +771,10 @@ public class DFSAdmin extends FsShell {
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" +
"\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" +
"\ton <hostname:port>. All other args after are sent to the host.";
String printTopology = "-printTopology: Print a tree of the racks and their\n" + String printTopology = "-printTopology: Print a tree of the racks and their\n" +
"\t\tnodes as reported by the Namenode\n"; "\t\tnodes as reported by the Namenode\n";
@ -848,6 +859,8 @@ public class DFSAdmin extends FsShell {
System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshSuperUserGroupsConfiguration);
} else if ("refreshCallQueue".equals(cmd)) { } else if ("refreshCallQueue".equals(cmd)) {
System.out.println(refreshCallQueue); System.out.println(refreshCallQueue);
} else if ("refresh".equals(cmd)) {
System.out.println(genericRefresh);
} else if ("printTopology".equals(cmd)) { } else if ("printTopology".equals(cmd)) {
System.out.println(printTopology); System.out.println(printTopology);
} else if ("refreshNamenodes".equals(cmd)) { } else if ("refreshNamenodes".equals(cmd)) {
@ -887,6 +900,7 @@ public class DFSAdmin extends FsShell {
System.out.println(refreshUserToGroupsMappings); System.out.println(refreshUserToGroupsMappings);
System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshCallQueue); System.out.println(refreshCallQueue);
System.out.println(genericRefresh);
System.out.println(printTopology); System.out.println(printTopology);
System.out.println(refreshNamenodes); System.out.println(refreshNamenodes);
System.out.println(deleteBlockPool); System.out.println(deleteBlockPool);
@ -1100,6 +1114,56 @@ public class DFSAdmin extends FsShell {
return 0; return 0;
} }
public int genericRefresh(String[] argv, int i) throws IOException {
String hostport = argv[i++];
String identifier = argv[i++];
String[] args = Arrays.copyOfRange(argv, i, argv.length);
// Get the current configuration
Configuration conf = getConf();
// for security authorization
// server principal for this call
// should be NN's one.
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client
Class<?> xface = GenericRefreshProtocolPB.class;
InetSocketAddress address = NetUtils.createSocketAddr(hostport);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB)
RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
ugi, conf, NetUtils.getDefaultSocketFactory(conf), 0);
GenericRefreshProtocol xlator =
new GenericRefreshProtocolClientSideTranslatorPB(proxy);
// Refresh
Collection<RefreshResponse> responses = xlator.refresh(identifier, args);
int returnCode = 0;
// Print refresh responses
System.out.println("Refresh Responses:\n");
for (RefreshResponse response : responses) {
System.out.println(response.toString());
if (returnCode == 0 && response.getReturnCode() != 0) {
// This is the first non-zero return code, so we should return this
returnCode = response.getReturnCode();
} else if (returnCode != 0 && response.getReturnCode() != 0) {
// Then now we have multiple non-zero return codes,
// so we merge them into -1
returnCode = -1;
}
}
return returnCode;
}
/** /**
* Displays format of commands. * Displays format of commands.
* @param cmd The command that is being executed. * @param cmd The command that is being executed.
@ -1162,6 +1226,9 @@ public class DFSAdmin extends FsShell {
} else if ("-refreshCallQueue".equals(cmd)) { } else if ("-refreshCallQueue".equals(cmd)) {
System.err.println("Usage: java DFSAdmin" System.err.println("Usage: java DFSAdmin"
+ " [-refreshCallQueue]"); + " [-refreshCallQueue]");
} else if ("-refresh".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-refresh <hostname:port> <resource_identifier> [arg1..argn]");
} else if ("-printTopology".equals(cmd)) { } else if ("-printTopology".equals(cmd)) {
System.err.println("Usage: java DFSAdmin" System.err.println("Usage: java DFSAdmin"
+ " [-printTopology]"); + " [-printTopology]");
@ -1195,6 +1262,7 @@ public class DFSAdmin extends FsShell {
System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshCallQueue]"); System.err.println(" [-refreshCallQueue]");
System.err.println(" [-refresh]");
System.err.println(" [-printTopology]"); System.err.println(" [-printTopology]");
System.err.println(" [-refreshNamenodes datanodehost:port]"); System.err.println(" [-refreshNamenodes datanodehost:port]");
System.err.println(" [-deleteBlockPool datanode-host:port blockpoolId [force]]"); System.err.println(" [-deleteBlockPool datanode-host:port blockpoolId [force]]");
@ -1292,6 +1360,11 @@ public class DFSAdmin extends FsShell {
printUsage(cmd); printUsage(cmd);
return exitCode; return exitCode;
} }
} else if ("-refresh".equals(cmd)) {
if (argv.length < 3) {
printUsage(cmd);
return exitCode;
}
} else if ("-refreshUserToGroupsMappings".equals(cmd)) { } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
if (argv.length != 1) { if (argv.length != 1) {
printUsage(cmd); printUsage(cmd);
@ -1387,6 +1460,8 @@ public class DFSAdmin extends FsShell {
exitCode = refreshSuperUserGroupsConfiguration(); exitCode = refreshSuperUserGroupsConfiguration();
} else if ("-refreshCallQueue".equals(cmd)) { } else if ("-refreshCallQueue".equals(cmd)) {
exitCode = refreshCallQueue(); exitCode = refreshCallQueue();
} else if ("-refresh".equals(cmd)) {
exitCode = genericRefresh(argv, i);
} else if ("-printTopology".equals(cmd)) { } else if ("-printTopology".equals(cmd)) {
exitCode = printTopology(); exitCode = printTopology();
} else if ("-refreshNamenodes".equals(cmd)) { } else if ("-refreshNamenodes".equals(cmd)) {

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.ipc.RefreshHandler;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.mockito.Mockito;
/**
* Before all tests, a MiniDFSCluster is spun up.
* Before each test, mock refresh handlers are created and registered.
* After each test, the mock handlers are unregistered.
* After all tests, the cluster is spun down.
*/
public class TestGenericRefresh {
private static MiniDFSCluster cluster;
private static Configuration config;
private static final int NNPort = 54222;
private static RefreshHandler firstHandler;
private static RefreshHandler secondHandler;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
config = new Configuration();
config.set("hadoop.security.authorization", "true");
FileSystem.setDefaultUri(config, "hdfs://localhost:" + NNPort);
cluster = new MiniDFSCluster.Builder(config).nameNodePort(NNPort).build();
cluster.waitActive();
}
@AfterClass
public static void tearDownBeforeClass() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Before
public void setUp() throws Exception {
// Register Handlers, first one just sends an ok response
firstHandler = Mockito.mock(RefreshHandler.class);
Mockito.stub(firstHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
.toReturn(RefreshResponse.successResponse());
RefreshRegistry.defaultRegistry().register("firstHandler", firstHandler);
// Second handler has conditional response for testing args
secondHandler = Mockito.mock(RefreshHandler.class);
Mockito.stub(secondHandler.handleRefresh("secondHandler", new String[]{"one", "two"}))
.toReturn(new RefreshResponse(3, "three"));
Mockito.stub(secondHandler.handleRefresh("secondHandler", new String[]{"one"}))
.toReturn(new RefreshResponse(2, "two"));
RefreshRegistry.defaultRegistry().register("secondHandler", secondHandler);
}
@After
public void tearDown() throws Exception {
RefreshRegistry.defaultRegistry().unregisterAll("firstHandler");
RefreshRegistry.defaultRegistry().unregisterAll("secondHandler");
}
@Test
public void testInvalidCommand() throws Exception {
DFSAdmin admin = new DFSAdmin(config);
String [] args = new String[]{"-refresh", "nn"};
int exitCode = admin.run(args);
assertEquals("DFSAdmin should fail due to bad args", -1, exitCode);
}
@Test
public void testInvalidIdentifier() throws Exception {
DFSAdmin admin = new DFSAdmin(config);
String [] args = new String[]{"-refresh", "localhost:" + NNPort, "unregisteredIdentity"};
int exitCode = admin.run(args);
assertEquals("DFSAdmin should fail due to no handler registered", -1, exitCode);
}
@Test
public void testValidIdentifier() throws Exception {
DFSAdmin admin = new DFSAdmin(config);
String[] args = new String[]{"-refresh", "localhost:" + NNPort, "firstHandler"};
int exitCode = admin.run(args);
assertEquals("DFSAdmin should succeed", 0, exitCode);
Mockito.verify(firstHandler).handleRefresh("firstHandler", new String[]{});
// Second handler was never called
Mockito.verify(secondHandler, Mockito.never())
.handleRefresh(Mockito.anyString(), Mockito.any(String[].class));
}
@Test
public void testVariableArgs() throws Exception {
DFSAdmin admin = new DFSAdmin(config);
String[] args = new String[]{"-refresh", "localhost:" + NNPort, "secondHandler", "one"};
int exitCode = admin.run(args);
assertEquals("DFSAdmin should return 2", 2, exitCode);
exitCode = admin.run(new String[]{"-refresh", "localhost:" + NNPort, "secondHandler", "one", "two"});
assertEquals("DFSAdmin should now return 3", 3, exitCode);
Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one"});
Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one", "two"});
}
@Test
public void testUnregistration() throws Exception {
RefreshRegistry.defaultRegistry().unregisterAll("firstHandler");
// And now this should fail
DFSAdmin admin = new DFSAdmin(config);
String[] args = new String[]{"-refresh", "localhost:" + NNPort, "firstHandler"};
int exitCode = admin.run(args);
assertEquals("DFSAdmin should return -1", -1, exitCode);
}
@Test
public void testUnregistrationReturnValue() {
RefreshHandler mockHandler = Mockito.mock(RefreshHandler.class);
RefreshRegistry.defaultRegistry().register("test", mockHandler);
boolean ret = RefreshRegistry.defaultRegistry().unregister("test", mockHandler);
assertTrue(ret);
}
@Test
public void testMultipleRegistration() throws Exception {
RefreshRegistry.defaultRegistry().register("sharedId", firstHandler);
RefreshRegistry.defaultRegistry().register("sharedId", secondHandler);
// this should trigger both
DFSAdmin admin = new DFSAdmin(config);
String[] args = new String[]{"-refresh", "localhost:" + NNPort, "sharedId", "one"};
int exitCode = admin.run(args);
assertEquals(-1, exitCode); // -1 because one of the responses is unregistered
// verify we called both
Mockito.verify(firstHandler).handleRefresh("sharedId", new String[]{"one"});
Mockito.verify(secondHandler).handleRefresh("sharedId", new String[]{"one"});
RefreshRegistry.defaultRegistry().unregisterAll("sharedId");
}
@Test
public void testMultipleReturnCodeMerging() throws Exception {
// Two handlers which return two non-zero values
RefreshHandler handlerOne = Mockito.mock(RefreshHandler.class);
Mockito.stub(handlerOne.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
.toReturn(new RefreshResponse(23, "Twenty Three"));
RefreshHandler handlerTwo = Mockito.mock(RefreshHandler.class);
Mockito.stub(handlerTwo.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
.toReturn(new RefreshResponse(10, "Ten"));
// Then registered to the same ID
RefreshRegistry.defaultRegistry().register("shared", handlerOne);
RefreshRegistry.defaultRegistry().register("shared", handlerTwo);
// We refresh both
DFSAdmin admin = new DFSAdmin(config);
String[] args = new String[]{"-refresh", "localhost:" + NNPort, "shared"};
int exitCode = admin.run(args);
assertEquals(-1, exitCode); // We get -1 because of our logic for melding non-zero return codes
// Verify we called both
Mockito.verify(handlerOne).handleRefresh("shared", new String[]{});
Mockito.verify(handlerTwo).handleRefresh("shared", new String[]{});
RefreshRegistry.defaultRegistry().unregisterAll("shared");
}
@Test
public void testExceptionResultsInNormalError() throws Exception {
// In this test, we ensure that all handlers are called even if we throw an exception in one
RefreshHandler exceptionalHandler = Mockito.mock(RefreshHandler.class);
Mockito.stub(exceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
.toThrow(new RuntimeException("Exceptional Handler Throws Exception"));
RefreshHandler otherExceptionalHandler = Mockito.mock(RefreshHandler.class);
Mockito.stub(otherExceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
.toThrow(new RuntimeException("More Exceptions"));
RefreshRegistry.defaultRegistry().register("exceptional", exceptionalHandler);
RefreshRegistry.defaultRegistry().register("exceptional", otherExceptionalHandler);
DFSAdmin admin = new DFSAdmin(config);
String[] args = new String[]{"-refresh", "localhost:" + NNPort, "exceptional"};
int exitCode = admin.run(args);
assertEquals(-1, exitCode); // Exceptions result in a -1
Mockito.verify(exceptionalHandler).handleRefresh("exceptional", new String[]{});
Mockito.verify(otherExceptionalHandler).handleRefresh("exceptional", new String[]{});
RefreshRegistry.defaultRegistry().unregisterAll("exceptional");
}
}