YARN-2189. [YARN-1492] Admin service for cache manager. (Chris Trezzo via kasha)

This commit is contained in:
Karthik Kambatla 2014-12-04 17:36:32 -08:00
parent 26d8dec756
commit 78968155d7
19 changed files with 973 additions and 0 deletions

View File

@ -54,6 +54,9 @@ Release 2.7.0 - UNRELEASED
YARN-2188. [YARN-1492] Client service for cache manager.
(Chris Trezzo and Sangjin Lee via kasha)
YARN-2189. [YARN-1492] Admin service for cache manager.
(Chris Trezzo via kasha)
YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe
via jianhe)

View File

@ -36,6 +36,7 @@ function hadoop_usage
echo " resourcemanager -format-state-store deletes the RMStateStore"
echo " rmadmin admin tools"
echo " sharedcachemanager run the SharedCacheManager daemon"
echo " scmadmin SharedCacheManager admin tools"
echo " timelineserver run the timeline server"
echo " version print the version"
echo " or"
@ -162,6 +163,10 @@ case "${COMMAND}" in
CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager'
YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS"
;;
scmadmin)
CLASS='org.apache.hadoop.yarn.client.SCMAdmin'
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"

View File

@ -97,6 +97,7 @@
<include>application_history_client.proto</include>
<include>server/application_history_server.proto</include>
<include>client_SCM_protocol.proto</include>
<include>server/SCM_Admin_protocol.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>

View File

@ -1401,6 +1401,18 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_SCM_APP_CHECKER_CLASS =
"org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker";
/** The address of the SCM admin interface. */
public static final String SCM_ADMIN_ADDRESS =
SHARED_CACHE_PREFIX + "admin.address";
public static final int DEFAULT_SCM_ADMIN_PORT = 8047;
public static final String DEFAULT_SCM_ADMIN_ADDRESS =
"0.0.0.0:" + DEFAULT_SCM_ADMIN_PORT;
/** Number of threads used to handle SCM admin interface. */
public static final String SCM_ADMIN_CLIENT_THREAD_COUNT =
SHARED_CACHE_PREFIX + "admin.thread-count";
public static final int DEFAULT_SCM_ADMIN_CLIENT_THREAD_COUNT = 1;
// In-memory SCM store configuration
public static final String IN_MEMORY_STORE_PREFIX =

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* <p>
* The protocol between administrators and the <code>SharedCacheManager</code>
* </p>
*/
@Public
@Unstable
public interface SCMAdminProtocol {
/**
* <p>
* The method used by administrators to ask SCM to run cleaner task right away
* </p>
*
* @param request request <code>SharedCacheManager</code> to run a cleaner task
* @return <code>SharedCacheManager</code> returns an empty response
* on success and throws an exception on rejecting the request
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
public RunSharedCacheCleanerTaskResponse runCleanerTask(
RunSharedCacheCleanerTaskRequest request) throws YarnException, IOException;
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.SCMAdminProtocol.SCMAdminProtocolService;
@Private
@Unstable
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB",
protocolVersion = 1)
public interface SCMAdminProtocolPB extends
SCMAdminProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* <p>
* The request from admin to ask the <code>SharedCacheManager</code> to run
* cleaner service right away.
* </p>
*
* <p>
* Currently, this is empty.
* </p>
*/
@Public
@Unstable
public abstract class RunSharedCacheCleanerTaskRequest {
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* <p>
* The response to admin from the <code>SharedCacheManager</code> when
* is asked to run the cleaner service.
* </p>
*
* <p>
* Currently, this is empty.
* </p>
*/
@Public
@Unstable
public abstract class RunSharedCacheCleanerTaskResponse {
/**
* Get whether or not the shared cache manager has accepted the request.
* Shared cache manager will reject the request if there is an ongoing task
*
* @return boolean True if the request has been accepted, false otherwise.
*/
@Public
@Unstable
public abstract boolean getAccepted();
/**
* Set whether or not the shared cache manager has accepted the request Shared
* cache manager will reject the request if there is an ongoing task
*
* @param b True if the request has been accepted, false otherwise.
*/
@Public
@Unstable
public abstract void setAccepted(boolean b);
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "SCMAdminProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_service_protos.proto";
service SCMAdminProtocolService {
rpc runCleanerTask (RunSharedCacheCleanerTaskRequestProto) returns (RunSharedCacheCleanerTaskResponseProto);
}

View File

@ -354,3 +354,14 @@ message ReservationDeleteRequestProto {
message ReservationDeleteResponseProto {
}
//////////////////////////////////////////////////////
/////// SCM_Admin_Protocol //////////////////////////
//////////////////////////////////////////////////////
message RunSharedCacheCleanerTaskRequestProto {
}
message RunSharedCacheCleanerTaskResponseProto {
optional bool accepted = 1;
}

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
public class SCMAdmin extends Configured implements Tool {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public SCMAdmin() {
super();
}
public SCMAdmin(Configuration conf) {
super(conf);
}
private static void printHelp(String cmd) {
String summary = "scmadmin is the command to execute shared cache manager" +
"administrative commands.\n" +
"The full syntax is: \n\n" +
"hadoop scmadmin" +
" [-runCleanerTask]" +
" [-help [cmd]]\n";
String runCleanerTask =
"-runCleanerTask: Run cleaner task right away.\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
if ("runCleanerTask".equals(cmd)) {
System.out.println(runCleanerTask);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
System.out.println(summary);
System.out.println(runCleanerTask);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
}
}
/**
* Displays format of commands.
* @param cmd The command that is being executed.
*/
private static void printUsage(String cmd) {
if ("-runCleanerTask".equals(cmd)) {
System.err.println("Usage: yarn scmadmin" + " [-runCleanerTask]");
} else {
System.err.println("Usage: yarn scmadmin");
System.err.println(" [-runCleanerTask]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
}
}
protected SCMAdminProtocol createSCMAdminProtocol() throws IOException {
// Get the current configuration
final YarnConfiguration conf = new YarnConfiguration(getConf());
// Create the admin client
final InetSocketAddress addr = conf.getSocketAddr(
YarnConfiguration.SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_PORT);
final YarnRPC rpc = YarnRPC.create(conf);
SCMAdminProtocol scmAdminProtocol =
(SCMAdminProtocol) rpc.getProxy(SCMAdminProtocol.class, addr, conf);
return scmAdminProtocol;
}
private int runCleanerTask() throws YarnException, IOException {
// run cleaner task right away
SCMAdminProtocol scmAdminProtocol = createSCMAdminProtocol();
RunSharedCacheCleanerTaskRequest request =
recordFactory.newRecordInstance(RunSharedCacheCleanerTaskRequest.class);
RunSharedCacheCleanerTaskResponse response =
scmAdminProtocol.runCleanerTask(request);
if (response.getAccepted()) {
System.out.println("request accepted by shared cache manager");
return 0;
} else {
System.out.println("request rejected by shared cache manager");
return 1;
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 1) {
printUsage("");
return -1;
}
int i = 0;
String cmd = args[i++];
try {
if ("-runCleanerTask".equals(cmd)) {
if (args.length != 1) {
printUsage(cmd);
return -1;
} else {
return runCleanerTask();
}
} else if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);
} else {
printHelp("");
}
return 0;
} else {
System.err.println(cmd.substring(1) + ": Unknown command");
printUsage("");
return -1;
}
} catch (IllegalArgumentException arge) {
System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
printUsage(cmd);
} catch (RemoteException e) {
//
// This is a error returned by hadoop server. Print
// out the first line of the error message, ignore the stack trace.
try {
String[] content;
content = e.getLocalizedMessage().split("\n");
System.err.println(cmd.substring(1) + ": "
+ content[0]);
} catch (Exception ex) {
System.err.println(cmd.substring(1) + ": "
+ ex.getLocalizedMessage());
}
} catch (Exception e) {
System.err.println(cmd.substring(1) + ": "
+ e.getLocalizedMessage());
}
return -1;
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new SCMAdmin(), args);
System.exit(result);
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.impl.pb.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import com.google.protobuf.ServiceException;
public class SCMAdminProtocolPBClientImpl implements SCMAdminProtocol,
Closeable {
private SCMAdminProtocolPB proxy;
public SCMAdminProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, SCMAdminProtocolPB.class,
ProtobufRpcEngine.class);
proxy = RPC.getProxy(SCMAdminProtocolPB.class, clientVersion, addr, conf);
}
@Override
public void close() {
if (this.proxy != null) {
RPC.stopProxy(this.proxy);
}
}
@Override
public RunSharedCacheCleanerTaskResponse runCleanerTask(
RunSharedCacheCleanerTaskRequest request) throws YarnException,
IOException {
YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto requestProto =
((RunSharedCacheCleanerTaskRequestPBImpl) request).getProto();
try {
return new RunSharedCacheCleanerTaskResponsePBImpl(proxy.runCleanerTask(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class SCMAdminProtocolPBServiceImpl implements SCMAdminProtocolPB {
private SCMAdminProtocol real;
public SCMAdminProtocolPBServiceImpl(SCMAdminProtocol impl) {
this.real = impl;
}
@Override
public RunSharedCacheCleanerTaskResponseProto runCleanerTask(RpcController controller,
RunSharedCacheCleanerTaskRequestProto proto) throws ServiceException {
RunSharedCacheCleanerTaskRequestPBImpl request =
new RunSharedCacheCleanerTaskRequestPBImpl(proto);
try {
RunSharedCacheCleanerTaskResponse response = real.runCleanerTask(request);
return ((RunSharedCacheCleanerTaskResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto;
public class RunSharedCacheCleanerTaskRequestPBImpl extends
RunSharedCacheCleanerTaskRequest {
RunSharedCacheCleanerTaskRequestProto proto =
RunSharedCacheCleanerTaskRequestProto.getDefaultInstance();
RunSharedCacheCleanerTaskRequestProto.Builder builder = null;
boolean viaProto = false;
public RunSharedCacheCleanerTaskRequestPBImpl() {
builder = RunSharedCacheCleanerTaskRequestProto.newBuilder();
}
public RunSharedCacheCleanerTaskRequestPBImpl(
RunSharedCacheCleanerTaskRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public RunSharedCacheCleanerTaskRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RunSharedCacheCleanerTaskRequestProto.newBuilder(proto);
}
viaProto = false;
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProtoOrBuilder;
public class RunSharedCacheCleanerTaskResponsePBImpl extends
RunSharedCacheCleanerTaskResponse {
RunSharedCacheCleanerTaskResponseProto proto =
RunSharedCacheCleanerTaskResponseProto.getDefaultInstance();
RunSharedCacheCleanerTaskResponseProto.Builder builder = null;
boolean viaProto = false;
public RunSharedCacheCleanerTaskResponsePBImpl() {
builder = RunSharedCacheCleanerTaskResponseProto.newBuilder();
}
public RunSharedCacheCleanerTaskResponsePBImpl(
RunSharedCacheCleanerTaskResponseProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public boolean getAccepted() {
RunSharedCacheCleanerTaskResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasAccepted()) ? p.getAccepted() : false;
}
@Override
public void setAccepted(boolean b) {
maybeInitBuilder();
builder.setAccepted(b);
}
public RunSharedCacheCleanerTaskResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RunSharedCacheCleanerTaskResponseProto.newBuilder(proto);
}
viaProto = false;
}
}

View File

@ -1431,6 +1431,18 @@
<name>yarn.sharedcache.store.in-memory.check-period-mins</name>
<value>720</value>
</property>
<property>
<description>The address of the admin interface in the SCM (shared cache manager)</description>
<name>yarn.sharedcache.admin.address</name>
<value>0.0.0.0:8047</value>
</property>
<property>
<description>The number of threads used to handle SCM admin interface (1 by default)</description>
<name>yarn.sharedcache.admin.thread-count</name>
<value>1</value>
</property>
<property>
<description>The frequency at which a cleaner task runs.

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
/**
* This service handles all SCMAdminProtocol rpc calls from administrators
* to the shared cache manager.
*/
@Private
@Unstable
public class SCMAdminProtocolService extends AbstractService implements
SCMAdminProtocol {
private static final Log LOG = LogFactory.getLog(SCMAdminProtocolService.class);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Server server;
InetSocketAddress clientBindAddress;
private final CleanerService cleanerService;
private AccessControlList adminAcl;
public SCMAdminProtocolService(CleanerService cleanerService) {
super(SCMAdminProtocolService.class.getName());
this.cleanerService = cleanerService;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.clientBindAddress = getBindAddress(conf);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
super.serviceInit(conf);
}
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_PORT);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(SCMAdminProtocol.class, this,
clientBindAddress,
conf, null, // Secret manager null for now (security not supported)
conf.getInt(YarnConfiguration.SCM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_SCM_ADMIN_CLIENT_THREAD_COUNT));
// TODO: Enable service authorization (see YARN-2774)
this.server.start();
clientBindAddress =
conf.updateConnectAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
private void checkAcls(String method) throws YarnException {
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
throw RPCUtil.getRemoteException(ioe);
}
if (!adminAcl.isUserAllowed(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");
throw RPCUtil.getRemoteException(
new AccessControlException("User " + user.getShortUserName() +
" doesn't have permission" + " to call '" + method + "'"));
}
LOG.info("SCM Admin: " + method + " invoked by user " +
user.getShortUserName());
}
@Override
public RunSharedCacheCleanerTaskResponse runCleanerTask(
RunSharedCacheCleanerTaskRequest request) throws YarnException {
checkAcls("runCleanerTask");
RunSharedCacheCleanerTaskResponse response =
recordFactory.newRecordInstance(RunSharedCacheCleanerTaskResponse.class);
this.cleanerService.runCleanerTask();
// if we are here, then we have submitted the request to the cleaner
// service, ack the request to the admin client
response.setAccepted(true);
return response;
}
}

View File

@ -74,6 +74,9 @@ public class SharedCacheManager extends CompositeService {
ClientProtocolService cps = createClientProtocolService(store);
addService(cps);
SCMAdminProtocolService saps = createSCMAdminProtocolService(cs);
addService(saps);
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
@ -113,6 +116,11 @@ public class SharedCacheManager extends CompositeService {
return new ClientProtocolService(store);
}
private SCMAdminProtocolService createSCMAdminProtocolService(
CleanerService cleanerService) {
return new SCMAdminProtocolService(cleanerService);
}
@Override
protected void serviceStop() throws Exception {

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl;
import org.apache.hadoop.yarn.client.SCMAdmin;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Basic unit tests for the SCM Admin Protocol Service and SCMAdmin.
*/
public class TestSCMAdminProtocolService {
static SCMAdminProtocolService service;
static SCMAdminProtocol SCMAdminProxy;
static SCMAdminProtocol mockAdmin;
static SCMAdmin adminCLI;
static SCMStore store;
static CleanerService cleaner;
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
@Before
public void startUp() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.SCM_STORE_CLASS,
InMemorySCMStore.class.getName());
cleaner = mock(CleanerService.class);
service = spy(new SCMAdminProtocolService(cleaner));
service.init(conf);
service.start();
YarnRPC rpc = YarnRPC.create(new Configuration());
InetSocketAddress scmAddress =
conf.getSocketAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_PORT);
SCMAdminProxy =
(SCMAdminProtocol) rpc.getProxy(SCMAdminProtocol.class, scmAddress,
conf);
mockAdmin = mock(SCMAdminProtocol.class);
adminCLI = new SCMAdmin(new Configuration()) {
@Override
protected SCMAdminProtocol createSCMAdminProtocol() throws IOException {
return mockAdmin;
}
};
}
@After
public void cleanUpTest() {
if (service != null) {
service.stop();
}
if (SCMAdminProxy != null) {
RPC.stopProxy(SCMAdminProxy);
}
}
@Test
public void testRunCleanerTask() throws Exception {
doNothing().when(cleaner).runCleanerTask();
RunSharedCacheCleanerTaskRequest request =
recordFactory.newRecordInstance(RunSharedCacheCleanerTaskRequest.class);
RunSharedCacheCleanerTaskResponse response = SCMAdminProxy.runCleanerTask(request);
Assert.assertTrue("cleaner task request isn't accepted", response.getAccepted());
verify(service, times(1)).runCleanerTask(any(RunSharedCacheCleanerTaskRequest.class));
}
@Test
public void testRunCleanerTaskCLI() throws Exception {
String[] args = { "-runCleanerTask" };
RunSharedCacheCleanerTaskResponse rp =
new RunSharedCacheCleanerTaskResponsePBImpl();
rp.setAccepted(true);
when(mockAdmin.runCleanerTask(isA(RunSharedCacheCleanerTaskRequest.class)))
.thenReturn(rp);
assertEquals(0, adminCLI.run(args));
rp.setAccepted(false);
when(mockAdmin.runCleanerTask(isA(RunSharedCacheCleanerTaskRequest.class)))
.thenReturn(rp);
assertEquals(1, adminCLI.run(args));
verify(mockAdmin, times(2)).runCleanerTask(
any(RunSharedCacheCleanerTaskRequest.class));
}
}