YARN-2189. [YARN-1492] Admin service for cache manager. (Chris Trezzo via kasha)
(cherry picked from commit 78968155d7
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/bin/yarn
This commit is contained in:
parent
448f925fc0
commit
aadd0c392b
|
@ -24,6 +24,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2188. [YARN-1492] Client service for cache manager.
|
||||
(Chris Trezzo and Sangjin Lee via kasha)
|
||||
|
||||
YARN-2189. [YARN-1492] Admin service for cache manager.
|
||||
(Chris Trezzo via kasha)
|
||||
|
||||
YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe
|
||||
via jianhe)
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ function print_usage(){
|
|||
echo " timelineserver run the timeline server"
|
||||
echo " rmadmin admin tools"
|
||||
echo " sharedcachemanager run the SharedCacheManager daemon"
|
||||
echo " scmadmin SharedCacheManager admin tools"
|
||||
echo " version print the version"
|
||||
echo " jar <jar> run a jar file"
|
||||
echo " application prints application(s)"
|
||||
|
@ -102,7 +103,104 @@ case $COMMAND in
|
|||
--help|-help|-h)
|
||||
print_usage
|
||||
exit
|
||||
<<<<<<< HEAD
|
||||
;;
|
||||
=======
|
||||
;;
|
||||
daemonlog)
|
||||
CLASS=org.apache.hadoop.log.LogLevel
|
||||
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
|
||||
;;
|
||||
jar)
|
||||
CLASS=org.apache.hadoop.util.RunJar
|
||||
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
|
||||
;;
|
||||
historyserver)
|
||||
supportdaemonization="true"
|
||||
echo "DEPRECATED: Use of this command to start the timeline server is deprecated." 1>&2
|
||||
echo "Instead use the timelineserver command for it." 1>&2
|
||||
echo "Starting the History Server anyway..." 1>&2
|
||||
CLASS='org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer'
|
||||
;;
|
||||
logs)
|
||||
CLASS=org.apache.hadoop.yarn.client.cli.LogsCLI
|
||||
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
|
||||
;;
|
||||
node)
|
||||
CLASS=org.apache.hadoop.yarn.client.cli.NodeCLI
|
||||
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
|
||||
;;
|
||||
nodemanager)
|
||||
supportdaemonization="true"
|
||||
CLASS='org.apache.hadoop.yarn.server.nodemanager.NodeManager'
|
||||
hadoop_debug "Append YARN_NODEMANAGER_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_NODEMANAGER_OPTS}"
|
||||
if [[ -n "${YARN_NODEMANAGER_HEAPSIZE}" ]]; then
|
||||
JAVA_HEAP_MAX="-Xmx${YARN_NODEMANAGER_HEAPSIZE}m"
|
||||
fi
|
||||
;;
|
||||
proxyserver)
|
||||
supportdaemonization="true"
|
||||
CLASS='org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer'
|
||||
hadoop_debug "Append YARN_PROXYSERVER_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_PROXYSERVER_OPTS}"
|
||||
if [[ -n "${YARN_PROXYSERVER_HEAPSIZE}" ]]; then
|
||||
JAVA_HEAP_MAX="-Xmx${YARN_PROXYSERVER_HEAPSIZE}m"
|
||||
fi
|
||||
;;
|
||||
queue)
|
||||
CLASS=org.apache.hadoop.yarn.client.cli.QueueCLI
|
||||
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
|
||||
;;
|
||||
resourcemanager)
|
||||
supportdaemonization="true"
|
||||
CLASS='org.apache.hadoop.yarn.server.resourcemanager.ResourceManager'
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_RESOURCEMANAGER_OPTS}"
|
||||
hadoop_debug "Append YARN_RESOURCEMANAGER_OPTS onto YARN_OPTS"
|
||||
if [[ -n "${YARN_RESOURCEMANAGER_HEAPSIZE}" ]]; then
|
||||
JAVA_HEAP_MAX="-Xmx${YARN_RESOURCEMANAGER_HEAPSIZE}m"
|
||||
fi
|
||||
;;
|
||||
rmadmin)
|
||||
CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI'
|
||||
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
|
||||
;;
|
||||
timelineserver)
|
||||
supportdaemonization="true"
|
||||
CLASS='org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer'
|
||||
hadoop_debug "Append YARN_TIMELINESERVER_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_TIMELINESERVER_OPTS}"
|
||||
if [[ -n "${YARN_TIMELINESERVER_HEAPSIZE}" ]]; then
|
||||
JAVA_HEAP_MAX="-Xmx${YARN_TIMELINESERVER_HEAPSIZE}m"
|
||||
fi
|
||||
;;
|
||||
sharedcachemanager)
|
||||
supportdaemonization="true"
|
||||
CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager'
|
||||
YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS"
|
||||
;;
|
||||
scmadmin)
|
||||
CLASS='org.apache.hadoop.yarn.client.SCMAdmin'
|
||||
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
|
||||
;;
|
||||
version)
|
||||
CLASS=org.apache.hadoop.util.VersionInfo
|
||||
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
|
||||
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
|
||||
;;
|
||||
*)
|
||||
CLASS="${COMMAND}"
|
||||
if ! hadoop_validate_classname "${CLASS}"; then
|
||||
hadoop_exit_with_usage 1
|
||||
fi
|
||||
;;
|
||||
>>>>>>> 7896815... YARN-2189. [YARN-1492] Admin service for cache manager. (Chris Trezzo via kasha)
|
||||
esac
|
||||
|
||||
if [ -f "${YARN_CONF_DIR}/yarn-env.sh" ]; then
|
||||
|
|
|
@ -97,6 +97,7 @@
|
|||
<include>application_history_client.proto</include>
|
||||
<include>server/application_history_server.proto</include>
|
||||
<include>client_SCM_protocol.proto</include>
|
||||
<include>server/SCM_Admin_protocol.proto</include>
|
||||
</includes>
|
||||
</source>
|
||||
<output>${project.build.directory}/generated-sources/java</output>
|
||||
|
|
|
@ -1401,6 +1401,18 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String DEFAULT_SCM_APP_CHECKER_CLASS =
|
||||
"org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker";
|
||||
|
||||
/** The address of the SCM admin interface. */
|
||||
public static final String SCM_ADMIN_ADDRESS =
|
||||
SHARED_CACHE_PREFIX + "admin.address";
|
||||
public static final int DEFAULT_SCM_ADMIN_PORT = 8047;
|
||||
public static final String DEFAULT_SCM_ADMIN_ADDRESS =
|
||||
"0.0.0.0:" + DEFAULT_SCM_ADMIN_PORT;
|
||||
|
||||
/** Number of threads used to handle SCM admin interface. */
|
||||
public static final String SCM_ADMIN_CLIENT_THREAD_COUNT =
|
||||
SHARED_CACHE_PREFIX + "admin.thread-count";
|
||||
public static final int DEFAULT_SCM_ADMIN_CLIENT_THREAD_COUNT = 1;
|
||||
|
||||
// In-memory SCM store configuration
|
||||
|
||||
public static final String IN_MEMORY_STORE_PREFIX =
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The protocol between administrators and the <code>SharedCacheManager</code>
|
||||
* </p>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public interface SCMAdminProtocol {
|
||||
/**
|
||||
* <p>
|
||||
* The method used by administrators to ask SCM to run cleaner task right away
|
||||
* </p>
|
||||
*
|
||||
* @param request request <code>SharedCacheManager</code> to run a cleaner task
|
||||
* @return <code>SharedCacheManager</code> returns an empty response
|
||||
* on success and throws an exception on rejecting the request
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public RunSharedCacheCleanerTaskResponse runCleanerTask(
|
||||
RunSharedCacheCleanerTaskRequest request) throws YarnException, IOException;
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||
import org.apache.hadoop.yarn.proto.SCMAdminProtocol.SCMAdminProtocolService;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB",
|
||||
protocolVersion = 1)
|
||||
public interface SCMAdminProtocolPB extends
|
||||
SCMAdminProtocolService.BlockingInterface {
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The request from admin to ask the <code>SharedCacheManager</code> to run
|
||||
* cleaner service right away.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Currently, this is empty.
|
||||
* </p>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class RunSharedCacheCleanerTaskRequest {
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The response to admin from the <code>SharedCacheManager</code> when
|
||||
* is asked to run the cleaner service.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Currently, this is empty.
|
||||
* </p>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class RunSharedCacheCleanerTaskResponse {
|
||||
|
||||
/**
|
||||
* Get whether or not the shared cache manager has accepted the request.
|
||||
* Shared cache manager will reject the request if there is an ongoing task
|
||||
*
|
||||
* @return boolean True if the request has been accepted, false otherwise.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract boolean getAccepted();
|
||||
|
||||
/**
|
||||
* Set whether or not the shared cache manager has accepted the request Shared
|
||||
* cache manager will reject the request if there is an ongoing task
|
||||
*
|
||||
* @param b True if the request has been accepted, false otherwise.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setAccepted(boolean b);
|
||||
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
option java_package = "org.apache.hadoop.yarn.proto";
|
||||
option java_outer_classname = "SCMAdminProtocol";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
package hadoop.yarn;
|
||||
|
||||
import "yarn_service_protos.proto";
|
||||
|
||||
service SCMAdminProtocolService {
|
||||
rpc runCleanerTask (RunSharedCacheCleanerTaskRequestProto) returns (RunSharedCacheCleanerTaskResponseProto);
|
||||
}
|
|
@ -354,3 +354,14 @@ message ReservationDeleteRequestProto {
|
|||
|
||||
message ReservationDeleteResponseProto {
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
/////// SCM_Admin_Protocol //////////////////////////
|
||||
//////////////////////////////////////////////////////
|
||||
|
||||
message RunSharedCacheCleanerTaskRequestProto {
|
||||
}
|
||||
|
||||
message RunSharedCacheCleanerTaskResponseProto {
|
||||
optional bool accepted = 1;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
|
||||
|
||||
public class SCMAdmin extends Configured implements Tool {
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
public SCMAdmin() {
|
||||
super();
|
||||
}
|
||||
|
||||
public SCMAdmin(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
private static void printHelp(String cmd) {
|
||||
String summary = "scmadmin is the command to execute shared cache manager" +
|
||||
"administrative commands.\n" +
|
||||
"The full syntax is: \n\n" +
|
||||
"hadoop scmadmin" +
|
||||
" [-runCleanerTask]" +
|
||||
" [-help [cmd]]\n";
|
||||
|
||||
String runCleanerTask =
|
||||
"-runCleanerTask: Run cleaner task right away.\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||
"\t\tis specified.\n";
|
||||
|
||||
if ("runCleanerTask".equals(cmd)) {
|
||||
System.out.println(runCleanerTask);
|
||||
} else if ("help".equals(cmd)) {
|
||||
System.out.println(help);
|
||||
} else {
|
||||
System.out.println(summary);
|
||||
System.out.println(runCleanerTask);
|
||||
System.out.println(help);
|
||||
System.out.println();
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Displays format of commands.
|
||||
* @param cmd The command that is being executed.
|
||||
*/
|
||||
private static void printUsage(String cmd) {
|
||||
if ("-runCleanerTask".equals(cmd)) {
|
||||
System.err.println("Usage: yarn scmadmin" + " [-runCleanerTask]");
|
||||
} else {
|
||||
System.err.println("Usage: yarn scmadmin");
|
||||
System.err.println(" [-runCleanerTask]");
|
||||
System.err.println(" [-help [cmd]]");
|
||||
System.err.println();
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
}
|
||||
}
|
||||
|
||||
protected SCMAdminProtocol createSCMAdminProtocol() throws IOException {
|
||||
// Get the current configuration
|
||||
final YarnConfiguration conf = new YarnConfiguration(getConf());
|
||||
|
||||
// Create the admin client
|
||||
final InetSocketAddress addr = conf.getSocketAddr(
|
||||
YarnConfiguration.SCM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_SCM_ADMIN_PORT);
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
SCMAdminProtocol scmAdminProtocol =
|
||||
(SCMAdminProtocol) rpc.getProxy(SCMAdminProtocol.class, addr, conf);
|
||||
return scmAdminProtocol;
|
||||
}
|
||||
|
||||
private int runCleanerTask() throws YarnException, IOException {
|
||||
// run cleaner task right away
|
||||
SCMAdminProtocol scmAdminProtocol = createSCMAdminProtocol();
|
||||
RunSharedCacheCleanerTaskRequest request =
|
||||
recordFactory.newRecordInstance(RunSharedCacheCleanerTaskRequest.class);
|
||||
RunSharedCacheCleanerTaskResponse response =
|
||||
scmAdminProtocol.runCleanerTask(request);
|
||||
if (response.getAccepted()) {
|
||||
System.out.println("request accepted by shared cache manager");
|
||||
return 0;
|
||||
} else {
|
||||
System.out.println("request rejected by shared cache manager");
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
printUsage("");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
String cmd = args[i++];
|
||||
|
||||
try {
|
||||
if ("-runCleanerTask".equals(cmd)) {
|
||||
if (args.length != 1) {
|
||||
printUsage(cmd);
|
||||
return -1;
|
||||
} else {
|
||||
return runCleanerTask();
|
||||
}
|
||||
} else if ("-help".equals(cmd)) {
|
||||
if (i < args.length) {
|
||||
printUsage(args[i]);
|
||||
} else {
|
||||
printHelp("");
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
System.err.println(cmd.substring(1) + ": Unknown command");
|
||||
printUsage("");
|
||||
return -1;
|
||||
}
|
||||
|
||||
} catch (IllegalArgumentException arge) {
|
||||
System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
|
||||
printUsage(cmd);
|
||||
} catch (RemoteException e) {
|
||||
//
|
||||
// This is a error returned by hadoop server. Print
|
||||
// out the first line of the error message, ignore the stack trace.
|
||||
try {
|
||||
String[] content;
|
||||
content = e.getLocalizedMessage().split("\n");
|
||||
System.err.println(cmd.substring(1) + ": "
|
||||
+ content[0]);
|
||||
} catch (Exception ex) {
|
||||
System.err.println(cmd.substring(1) + ": "
|
||||
+ ex.getLocalizedMessage());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println(cmd.substring(1) + ": "
|
||||
+ e.getLocalizedMessage());
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int result = ToolRunner.run(new SCMAdmin(), args);
|
||||
System.exit(result);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.impl.pb.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
public class SCMAdminProtocolPBClientImpl implements SCMAdminProtocol,
|
||||
Closeable {
|
||||
|
||||
private SCMAdminProtocolPB proxy;
|
||||
|
||||
public SCMAdminProtocolPBClientImpl(long clientVersion,
|
||||
InetSocketAddress addr, Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, SCMAdminProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
proxy = RPC.getProxy(SCMAdminProtocolPB.class, clientVersion, addr, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.proxy != null) {
|
||||
RPC.stopProxy(this.proxy);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunSharedCacheCleanerTaskResponse runCleanerTask(
|
||||
RunSharedCacheCleanerTaskRequest request) throws YarnException,
|
||||
IOException {
|
||||
YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto requestProto =
|
||||
((RunSharedCacheCleanerTaskRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new RunSharedCacheCleanerTaskResponsePBImpl(proxy.runCleanerTask(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.impl.pb.service;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProto;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
public class SCMAdminProtocolPBServiceImpl implements SCMAdminProtocolPB {
|
||||
|
||||
private SCMAdminProtocol real;
|
||||
|
||||
public SCMAdminProtocolPBServiceImpl(SCMAdminProtocol impl) {
|
||||
this.real = impl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunSharedCacheCleanerTaskResponseProto runCleanerTask(RpcController controller,
|
||||
RunSharedCacheCleanerTaskRequestProto proto) throws ServiceException {
|
||||
RunSharedCacheCleanerTaskRequestPBImpl request =
|
||||
new RunSharedCacheCleanerTaskRequestPBImpl(proto);
|
||||
try {
|
||||
RunSharedCacheCleanerTaskResponse response = real.runCleanerTask(request);
|
||||
return ((RunSharedCacheCleanerTaskResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto;
|
||||
|
||||
public class RunSharedCacheCleanerTaskRequestPBImpl extends
|
||||
RunSharedCacheCleanerTaskRequest {
|
||||
RunSharedCacheCleanerTaskRequestProto proto =
|
||||
RunSharedCacheCleanerTaskRequestProto.getDefaultInstance();
|
||||
RunSharedCacheCleanerTaskRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public RunSharedCacheCleanerTaskRequestPBImpl() {
|
||||
builder = RunSharedCacheCleanerTaskRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public RunSharedCacheCleanerTaskRequestPBImpl(
|
||||
RunSharedCacheCleanerTaskRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public RunSharedCacheCleanerTaskRequestProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = RunSharedCacheCleanerTaskRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProtoOrBuilder;
|
||||
|
||||
public class RunSharedCacheCleanerTaskResponsePBImpl extends
|
||||
RunSharedCacheCleanerTaskResponse {
|
||||
RunSharedCacheCleanerTaskResponseProto proto =
|
||||
RunSharedCacheCleanerTaskResponseProto.getDefaultInstance();
|
||||
RunSharedCacheCleanerTaskResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public RunSharedCacheCleanerTaskResponsePBImpl() {
|
||||
builder = RunSharedCacheCleanerTaskResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public RunSharedCacheCleanerTaskResponsePBImpl(
|
||||
RunSharedCacheCleanerTaskResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getAccepted() {
|
||||
RunSharedCacheCleanerTaskResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.hasAccepted()) ? p.getAccepted() : false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAccepted(boolean b) {
|
||||
maybeInitBuilder();
|
||||
builder.setAccepted(b);
|
||||
}
|
||||
|
||||
public RunSharedCacheCleanerTaskResponseProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = RunSharedCacheCleanerTaskResponseProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
}
|
|
@ -1430,6 +1430,18 @@
|
|||
<name>yarn.sharedcache.store.in-memory.check-period-mins</name>
|
||||
<value>720</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The address of the admin interface in the SCM (shared cache manager)</description>
|
||||
<name>yarn.sharedcache.admin.address</name>
|
||||
<value>0.0.0.0:8047</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The number of threads used to handle SCM admin interface (1 by default)</description>
|
||||
<name>yarn.sharedcache.admin.thread-count</name>
|
||||
<value>1</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The frequency at which a cleaner task runs.
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.sharedcachemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
|
||||
/**
|
||||
* This service handles all SCMAdminProtocol rpc calls from administrators
|
||||
* to the shared cache manager.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class SCMAdminProtocolService extends AbstractService implements
|
||||
SCMAdminProtocol {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(SCMAdminProtocolService.class);
|
||||
|
||||
private final RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
|
||||
private Server server;
|
||||
InetSocketAddress clientBindAddress;
|
||||
private final CleanerService cleanerService;
|
||||
private AccessControlList adminAcl;
|
||||
|
||||
public SCMAdminProtocolService(CleanerService cleanerService) {
|
||||
super(SCMAdminProtocolService.class.getName());
|
||||
this.cleanerService = cleanerService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.clientBindAddress = getBindAddress(conf);
|
||||
adminAcl = new AccessControlList(conf.get(
|
||||
YarnConfiguration.YARN_ADMIN_ACL,
|
||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
InetSocketAddress getBindAddress(Configuration conf) {
|
||||
return conf.getSocketAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_SCM_ADMIN_PORT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
this.server =
|
||||
rpc.getServer(SCMAdminProtocol.class, this,
|
||||
clientBindAddress,
|
||||
conf, null, // Secret manager null for now (security not supported)
|
||||
conf.getInt(YarnConfiguration.SCM_ADMIN_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_SCM_ADMIN_CLIENT_THREAD_COUNT));
|
||||
|
||||
// TODO: Enable service authorization (see YARN-2774)
|
||||
|
||||
this.server.start();
|
||||
clientBindAddress =
|
||||
conf.updateConnectAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
|
||||
server.getListenerAddress());
|
||||
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
private void checkAcls(String method) throws YarnException {
|
||||
UserGroupInformation user;
|
||||
try {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get current user", ioe);
|
||||
throw RPCUtil.getRemoteException(ioe);
|
||||
}
|
||||
|
||||
if (!adminAcl.isUserAllowed(user)) {
|
||||
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
|
||||
" to call '" + method + "'");
|
||||
|
||||
throw RPCUtil.getRemoteException(
|
||||
new AccessControlException("User " + user.getShortUserName() +
|
||||
" doesn't have permission" + " to call '" + method + "'"));
|
||||
}
|
||||
LOG.info("SCM Admin: " + method + " invoked by user " +
|
||||
user.getShortUserName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunSharedCacheCleanerTaskResponse runCleanerTask(
|
||||
RunSharedCacheCleanerTaskRequest request) throws YarnException {
|
||||
checkAcls("runCleanerTask");
|
||||
RunSharedCacheCleanerTaskResponse response =
|
||||
recordFactory.newRecordInstance(RunSharedCacheCleanerTaskResponse.class);
|
||||
this.cleanerService.runCleanerTask();
|
||||
// if we are here, then we have submitted the request to the cleaner
|
||||
// service, ack the request to the admin client
|
||||
response.setAccepted(true);
|
||||
return response;
|
||||
}
|
||||
}
|
|
@ -74,6 +74,9 @@ public class SharedCacheManager extends CompositeService {
|
|||
ClientProtocolService cps = createClientProtocolService(store);
|
||||
addService(cps);
|
||||
|
||||
SCMAdminProtocolService saps = createSCMAdminProtocolService(cs);
|
||||
addService(saps);
|
||||
|
||||
// init metrics
|
||||
DefaultMetricsSystem.initialize("SharedCacheManager");
|
||||
JvmMetrics.initSingleton("SharedCacheManager", null);
|
||||
|
@ -113,6 +116,11 @@ public class SharedCacheManager extends CompositeService {
|
|||
return new ClientProtocolService(store);
|
||||
}
|
||||
|
||||
private SCMAdminProtocolService createSCMAdminProtocolService(
|
||||
CleanerService cleanerService) {
|
||||
return new SCMAdminProtocolService(cleanerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.sharedcachemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.client.SCMAdmin;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Basic unit tests for the SCM Admin Protocol Service and SCMAdmin.
|
||||
*/
|
||||
public class TestSCMAdminProtocolService {
|
||||
|
||||
static SCMAdminProtocolService service;
|
||||
static SCMAdminProtocol SCMAdminProxy;
|
||||
static SCMAdminProtocol mockAdmin;
|
||||
static SCMAdmin adminCLI;
|
||||
static SCMStore store;
|
||||
static CleanerService cleaner;
|
||||
private final RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
|
||||
@Before
|
||||
public void startUp() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.SCM_STORE_CLASS,
|
||||
InMemorySCMStore.class.getName());
|
||||
|
||||
cleaner = mock(CleanerService.class);
|
||||
|
||||
service = spy(new SCMAdminProtocolService(cleaner));
|
||||
service.init(conf);
|
||||
service.start();
|
||||
|
||||
YarnRPC rpc = YarnRPC.create(new Configuration());
|
||||
|
||||
InetSocketAddress scmAddress =
|
||||
conf.getSocketAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_SCM_ADMIN_PORT);
|
||||
|
||||
SCMAdminProxy =
|
||||
(SCMAdminProtocol) rpc.getProxy(SCMAdminProtocol.class, scmAddress,
|
||||
conf);
|
||||
|
||||
mockAdmin = mock(SCMAdminProtocol.class);
|
||||
adminCLI = new SCMAdmin(new Configuration()) {
|
||||
@Override
|
||||
protected SCMAdminProtocol createSCMAdminProtocol() throws IOException {
|
||||
return mockAdmin;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUpTest() {
|
||||
if (service != null) {
|
||||
service.stop();
|
||||
}
|
||||
|
||||
if (SCMAdminProxy != null) {
|
||||
RPC.stopProxy(SCMAdminProxy);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunCleanerTask() throws Exception {
|
||||
doNothing().when(cleaner).runCleanerTask();
|
||||
RunSharedCacheCleanerTaskRequest request =
|
||||
recordFactory.newRecordInstance(RunSharedCacheCleanerTaskRequest.class);
|
||||
RunSharedCacheCleanerTaskResponse response = SCMAdminProxy.runCleanerTask(request);
|
||||
Assert.assertTrue("cleaner task request isn't accepted", response.getAccepted());
|
||||
verify(service, times(1)).runCleanerTask(any(RunSharedCacheCleanerTaskRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunCleanerTaskCLI() throws Exception {
|
||||
String[] args = { "-runCleanerTask" };
|
||||
RunSharedCacheCleanerTaskResponse rp =
|
||||
new RunSharedCacheCleanerTaskResponsePBImpl();
|
||||
rp.setAccepted(true);
|
||||
when(mockAdmin.runCleanerTask(isA(RunSharedCacheCleanerTaskRequest.class)))
|
||||
.thenReturn(rp);
|
||||
assertEquals(0, adminCLI.run(args));
|
||||
rp.setAccepted(false);
|
||||
when(mockAdmin.runCleanerTask(isA(RunSharedCacheCleanerTaskRequest.class)))
|
||||
.thenReturn(rp);
|
||||
assertEquals(1, adminCLI.run(args));
|
||||
verify(mockAdmin, times(2)).runCleanerTask(
|
||||
any(RunSharedCacheCleanerTaskRequest.class));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue