YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)

(cherry picked from commit 834b3b98e7ecc4bfe93c7dd1a9e928e9653cf138)
This commit is contained in:
Karthik Kambatla 2014-11-25 16:21:29 -08:00
parent ae35b0e14d
commit 3d4536af82
22 changed files with 1570 additions and 4 deletions

View File

@ -19,7 +19,10 @@ Release 2.7.0 - UNRELEASED
(Chris Trezzo and Sangjin Lee via kasha)
YARN-2236. [YARN-1492] Shared Cache uploader service on the Node
Manager. (Chris Trezzo and Sanjin Lee via kasha)
Manager. (Chris Trezzo and Sangjin Lee via kasha)
YARN-2188. [YARN-1492] Client service for cache manager.
(Chris Trezzo and Sangjin Lee via kasha)
IMPROVEMENTS

View File

@ -96,6 +96,7 @@
<include>server/resourcemanager_administration_protocol.proto</include>
<include>application_history_client.proto</include>
<include>server/application_history_server.proto</include>
<include>client_SCM_protocol.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* <p>
* The protocol between clients and the <code>SharedCacheManager</code> to claim
* and release resources in the shared cache.
* </p>
*/
@Public
@Unstable
public interface ClientSCMProtocol {
/**
* <p>
* The interface used by clients to claim a resource with the
* <code>SharedCacheManager.</code> The client uses a checksum to identify the
* resource and an {@link ApplicationId} to identify which application will be
* using the resource.
* </p>
*
* <p>
* The <code>SharedCacheManager</code> responds with whether or not the
* resource exists in the cache. If the resource exists, a <code>Path</code>
* to the resource in the shared cache is returned. If the resource does not
* exist, the response is empty.
* </p>
*
* @param request request to claim a resource in the shared cache
* @return response indicating if the resource is already in the cache
* @throws YarnException
* @throws IOException
*/
public UseSharedCacheResourceResponse use(
UseSharedCacheResourceRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by clients to release a resource with the
* <code>SharedCacheManager.</code> This method is called once an application
* is no longer using a claimed resource in the shared cache. The client uses
* a checksum to identify the resource and an {@link ApplicationId} to
* identify which application is releasing the resource.
* </p>
*
* <p>
* Note: This method is an optimization and the client is not required to call
* it for correctness.
* </p>
*
* <p>
* Currently the <code>SharedCacheManager</code> sends an empty response.
* </p>
*
* @param request request to release a resource in the shared cache
* @return (empty) response on releasing the resource
* @throws YarnException
* @throws IOException
*/
public ReleaseSharedCacheResourceResponse release(
ReleaseSharedCacheResourceRequest request) throws YarnException, IOException;
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.ClientSCMProtocol.ClientSCMProtocolService;
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ClientSCMProtocolPB",
protocolVersion = 1)
public interface ClientSCMProtocolPB extends
ClientSCMProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* <p>The request from clients to release a resource in the shared cache.</p>
*/
@Public
@Unstable
public abstract class ReleaseSharedCacheResourceRequest {
/**
* Get the <code>ApplicationId</code> of the resource to be released.
*
* @return <code>ApplicationId</code>
*/
@Public
@Unstable
public abstract ApplicationId getAppId();
/**
* Set the <code>ApplicationId</code> of the resource to be released.
*
* @param id <code>ApplicationId</code>
*/
@Public
@Unstable
public abstract void setAppId(ApplicationId id);
/**
* Get the <code>key</code> of the resource to be released.
*
* @return <code>key</code>
*/
@Public
@Unstable
public abstract String getResourceKey();
/**
* Set the <code>key</code> of the resource to be released.
*
* @param key unique identifier for the resource
*/
@Public
@Unstable
public abstract void setResourceKey(String key);
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* <p>
* The response to clients from the <code>SharedCacheManager</code> when
* releasing a resource in the shared cache.
* </p>
*
* <p>
* Currently, this is empty.
* </p>
*/
@Public
@Unstable
public abstract class ReleaseSharedCacheResourceResponse {
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* <p>
* The request from clients to the <code>SharedCacheManager</code> that claims a
* resource in the shared cache.
* </p>
*/
@Public
@Unstable
public abstract class UseSharedCacheResourceRequest {
/**
* Get the <code>ApplicationId</code> of the resource to be used.
*
* @return <code>ApplicationId</code>
*/
@Public
@Unstable
public abstract ApplicationId getAppId();
/**
* Set the <code>ApplicationId</code> of the resource to be used.
*
* @param id <code>ApplicationId</code>
*/
@Public
@Unstable
public abstract void setAppId(ApplicationId id);
/**
* Get the <code>key</code> of the resource to be used.
*
* @return <code>key</code>
*/
@Public
@Unstable
public abstract String getResourceKey();
/**
* Set the <code>key</code> of the resource to be used.
*
* @param key unique identifier for the resource
*/
@Public
@Unstable
public abstract void setResourceKey(String key);
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* <p>
* The response from the SharedCacheManager to the client that indicates whether
* a requested resource exists in the cache.
* </p>
*/
@Public
@Unstable
public abstract class UseSharedCacheResourceResponse {
/**
* Get the <code>Path</code> corresponding to the requested resource in the
* shared cache.
*
* @return String A <code>Path</code> if the resource exists in the shared
* cache, <code>null</code> otherwise
*/
@Public
@Unstable
public abstract String getPath();
/**
* Set the <code>Path</code> corresponding to a resource in the shared cache.
*
* @param p A <code>Path</code> corresponding to a resource in the shared
* cache
*/
@Public
@Unstable
public abstract void setPath(String p);
}

View File

@ -1472,6 +1472,18 @@ public class YarnConfiguration extends Configuration {
SHARED_CACHE_PREFIX + "uploader.server.thread-count";
public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
/** The address of the client interface in the SCM. */
public static final String SCM_CLIENT_SERVER_ADDRESS =
SHARED_CACHE_PREFIX + "client-server.address";
public static final int DEFAULT_SCM_CLIENT_SERVER_PORT = 8045;
public static final String DEFAULT_SCM_CLIENT_SERVER_ADDRESS = "0.0.0.0:"
+ DEFAULT_SCM_CLIENT_SERVER_PORT;
/** The number of threads used to handle shared cache manager requests. */
public static final String SCM_CLIENT_SERVER_THREAD_COUNT =
SHARED_CACHE_PREFIX + "client-server.thread-count";
public static final int DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT = 50;
/** the checksum algorithm implementation **/
public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
SHARED_CACHE_PREFIX + "checksum.algo.impl";

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "ClientSCMProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_service_protos.proto";
service ClientSCMProtocolService {
rpc use (UseSharedCacheResourceRequestProto) returns (UseSharedCacheResourceResponseProto);
rpc release (ReleaseSharedCacheResourceRequestProto) returns (ReleaseSharedCacheResourceResponseProto);
}

View File

@ -306,6 +306,27 @@ message GetContainersResponseProto {
repeated ContainerReportProto containers = 1;
}
//////////////////////////////////////////////////////
/////// client_SCM_Protocol //////////////////////////
//////////////////////////////////////////////////////
message UseSharedCacheResourceRequestProto {
optional ApplicationIdProto applicationId = 1;
optional string resourceKey = 2;
}
message UseSharedCacheResourceResponseProto {
optional string path = 1;
}
message ReleaseSharedCacheResourceRequestProto {
optional ApplicationIdProto applicationId = 1;
optional string resourceKey = 2;
}
message ReleaseSharedCacheResourceResponseProto {
}
//////////////////////////////////////////////////////
// reservation_protocol
//////////////////////////////////////////////////////

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.impl.pb.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientSCMProtocol;
import org.apache.hadoop.yarn.api.ClientSCMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
import com.google.protobuf.ServiceException;
public class ClientSCMProtocolPBClientImpl implements ClientSCMProtocol,
Closeable {
private ClientSCMProtocolPB proxy;
public ClientSCMProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ClientSCMProtocolPB.class,
ProtobufRpcEngine.class);
proxy = RPC.getProxy(ClientSCMProtocolPB.class, clientVersion, addr, conf);
}
@Override
public void close() {
if (this.proxy != null) {
RPC.stopProxy(this.proxy);
this.proxy = null;
}
}
@Override
public UseSharedCacheResourceResponse use(
UseSharedCacheResourceRequest request) throws YarnException, IOException {
UseSharedCacheResourceRequestProto requestProto =
((UseSharedCacheResourceRequestPBImpl) request).getProto();
try {
return new UseSharedCacheResourceResponsePBImpl(proxy.use(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public ReleaseSharedCacheResourceResponse release(
ReleaseSharedCacheResourceRequest request) throws YarnException,
IOException {
ReleaseSharedCacheResourceRequestProto requestProto =
((ReleaseSharedCacheResourceRequestPBImpl) request).getProto();
try {
return new ReleaseSharedCacheResourceResponsePBImpl(proxy.release(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.api.ClientSCMProtocol;
import org.apache.hadoop.yarn.api.ClientSCMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class ClientSCMProtocolPBServiceImpl implements ClientSCMProtocolPB {
private ClientSCMProtocol real;
public ClientSCMProtocolPBServiceImpl(ClientSCMProtocol impl) {
this.real = impl;
}
@Override
public UseSharedCacheResourceResponseProto use(RpcController controller,
UseSharedCacheResourceRequestProto proto) throws ServiceException {
UseSharedCacheResourceRequestPBImpl request =
new UseSharedCacheResourceRequestPBImpl(proto);
try {
UseSharedCacheResourceResponse response = real.use(request);
return ((UseSharedCacheResourceResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ReleaseSharedCacheResourceResponseProto release(
RpcController controller, ReleaseSharedCacheResourceRequestProto proto)
throws ServiceException {
ReleaseSharedCacheResourceRequestPBImpl request =
new ReleaseSharedCacheResourceRequestPBImpl(proto);
try {
ReleaseSharedCacheResourceResponse response = real.release(request);
return ((ReleaseSharedCacheResourceResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProtoOrBuilder;
public class ReleaseSharedCacheResourceRequestPBImpl extends
ReleaseSharedCacheResourceRequest {
ReleaseSharedCacheResourceRequestProto proto =
ReleaseSharedCacheResourceRequestProto.getDefaultInstance();
ReleaseSharedCacheResourceRequestProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId applicationId = null;
public ReleaseSharedCacheResourceRequestPBImpl() {
builder = ReleaseSharedCacheResourceRequestProto.newBuilder();
}
public ReleaseSharedCacheResourceRequestPBImpl(
ReleaseSharedCacheResourceRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public ReleaseSharedCacheResourceRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public ApplicationId getAppId() {
ReleaseSharedCacheResourceRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (this.applicationId != null) {
return this.applicationId;
}
if (!p.hasApplicationId()) {
return null;
}
this.applicationId = convertFromProtoFormat(p.getApplicationId());
return this.applicationId;
}
@Override
public void setAppId(ApplicationId id) {
maybeInitBuilder();
if (id == null)
builder.clearApplicationId();
this.applicationId = id;
}
@Override
public String getResourceKey() {
ReleaseSharedCacheResourceRequestProtoOrBuilder p =
viaProto ? proto : builder;
return (p.hasResourceKey()) ? p.getResourceKey() : null;
}
@Override
public void setResourceKey(String key) {
maybeInitBuilder();
if (key == null) {
builder.clearResourceKey();
return;
}
builder.setResourceKey(key);
}
private void mergeLocalToBuilder() {
if (applicationId != null) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ReleaseSharedCacheResourceRequestProto.newBuilder(proto);
}
viaProto = false;
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto;
public class ReleaseSharedCacheResourceResponsePBImpl extends
ReleaseSharedCacheResourceResponse {
ReleaseSharedCacheResourceResponseProto proto =
ReleaseSharedCacheResourceResponseProto.getDefaultInstance();
ReleaseSharedCacheResourceResponseProto.Builder builder = null;
boolean viaProto = false;
public ReleaseSharedCacheResourceResponsePBImpl() {
builder = ReleaseSharedCacheResourceResponseProto.newBuilder();
}
public ReleaseSharedCacheResourceResponsePBImpl(
ReleaseSharedCacheResourceResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public ReleaseSharedCacheResourceResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ReleaseSharedCacheResourceResponseProto.newBuilder(proto);
}
viaProto = false;
}
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProtoOrBuilder;
public class UseSharedCacheResourceRequestPBImpl extends
UseSharedCacheResourceRequest {
UseSharedCacheResourceRequestProto proto = UseSharedCacheResourceRequestProto
.getDefaultInstance();
UseSharedCacheResourceRequestProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId applicationId = null;
public UseSharedCacheResourceRequestPBImpl() {
builder = UseSharedCacheResourceRequestProto.newBuilder();
}
public UseSharedCacheResourceRequestPBImpl(
UseSharedCacheResourceRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public UseSharedCacheResourceRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public ApplicationId getAppId() {
UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationId != null) {
return this.applicationId;
}
if (!p.hasApplicationId()) {
return null;
}
this.applicationId = convertFromProtoFormat(p.getApplicationId());
return this.applicationId;
}
@Override
public void setAppId(ApplicationId id) {
maybeInitBuilder();
if (id == null)
builder.clearApplicationId();
this.applicationId = id;
}
@Override
public String getResourceKey() {
UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasResourceKey()) ? p.getResourceKey() : null;
}
@Override
public void setResourceKey(String key) {
maybeInitBuilder();
if (key == null) {
builder.clearResourceKey();
return;
}
builder.setResourceKey(key);
}
private void mergeLocalToBuilder() {
if (applicationId != null) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UseSharedCacheResourceRequestProto.newBuilder(proto);
}
viaProto = false;
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProtoOrBuilder;
public class UseSharedCacheResourceResponsePBImpl extends
UseSharedCacheResourceResponse {
UseSharedCacheResourceResponseProto proto =
UseSharedCacheResourceResponseProto
.getDefaultInstance();
UseSharedCacheResourceResponseProto.Builder builder = null;
boolean viaProto = false;
public UseSharedCacheResourceResponsePBImpl() {
builder = UseSharedCacheResourceResponseProto.newBuilder();
}
public UseSharedCacheResourceResponsePBImpl(
UseSharedCacheResourceResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public UseSharedCacheResourceResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public String getPath() {
UseSharedCacheResourceResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasPath()) ? p.getPath() : null;
}
@Override
public void setPath(String path) {
maybeInitBuilder();
if (path == null) {
builder.clearPath();
return;
}
builder.setPath(path);
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UseSharedCacheResourceResponseProto.newBuilder(proto);
}
viaProto = false;
}
}

View File

@ -1458,19 +1458,36 @@
</property>
<property>
<description>The algorithm used to compute checksums of files (SHA-256 by default)</description>
<description>The address of the client interface in the SCM
(shared cache manager)</description>
<name>yarn.sharedcache.client-server.address</name>
<value>0.0.0.0:8045</value>
</property>
<property>
<description>The number of threads used to handle shared cache manager
requests from clients (50 by default)</description>
<name>yarn.sharedcache.client-server.thread-count</name>
<value>50</value>
</property>
<property>
<description>The algorithm used to compute checksums of files (SHA-256 by
default)</description>
<name>yarn.sharedcache.checksum.algo.impl</name>
<value>org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl</value>
</property>
<property>
<description>The replication factor for the node manager uploader for the shared cache (10 by default)</description>
<description>The replication factor for the node manager uploader for the
shared cache (10 by default)</description>
<name>yarn.sharedcache.nm.uploader.replication.factor</name>
<value>10</value>
</property>
<property>
<description>The number of threads used to upload files from a node manager instance (20 by default)</description>
<description>The number of threads used to upload files from a node manager
instance (20 by default)</description>
<name>yarn.sharedcache.nm.uploader.thread-count</name>
<value>20</value>
</property>

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ClientSCMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
/**
* This service handles all rpc calls from the client to the shared cache
* manager.
*/
@Private
@Evolving
public class ClientProtocolService extends AbstractService implements
ClientSCMProtocol {
private static final Log LOG = LogFactory.getLog(ClientProtocolService.class);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Server server;
InetSocketAddress clientBindAddress;
private final SCMStore store;
private int cacheDepth;
private String cacheRoot;
private ClientSCMMetrics metrics;
public ClientProtocolService(SCMStore store) {
super(ClientProtocolService.class.getName());
this.store = store;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.clientBindAddress = getBindAddress(conf);
this.cacheDepth = SharedCacheUtil.getCacheDepth(conf);
this.cacheRoot =
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
super.serviceInit(conf);
}
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
this.metrics = ClientSCMMetrics.initSingleton(conf);
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ClientSCMProtocol.class, this,
clientBindAddress,
conf, null, // Secret manager null for now (security not supported)
conf.getInt(YarnConfiguration.SCM_CLIENT_SERVER_THREAD_COUNT,
YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT));
// TODO (YARN-2774): Enable service authorization
this.server.start();
clientBindAddress =
conf.updateConnectAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
@Override
public UseSharedCacheResourceResponse use(
UseSharedCacheResourceRequest request) throws YarnException,
IOException {
UseSharedCacheResourceResponse response =
recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class);
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
String fileName =
this.store.addResourceReference(request.getResourceKey(),
new SharedCacheResourceReference(request.getAppId(),
callerUGI.getShortUserName()));
if (fileName != null) {
response
.setPath(getCacheEntryFilePath(request.getResourceKey(), fileName));
this.metrics.incCacheHitCount();
} else {
this.metrics.incCacheMissCount();
}
return response;
}
@Override
public ReleaseSharedCacheResourceResponse release(
ReleaseSharedCacheResourceRequest request) throws YarnException,
IOException {
ReleaseSharedCacheResourceResponse response =
recordFactory
.newRecordInstance(ReleaseSharedCacheResourceResponse.class);
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
boolean removed =
this.store.removeResourceReference(
request.getResourceKey(),
new SharedCacheResourceReference(request.getAppId(), callerUGI
.getShortUserName()), true);
if (removed) {
this.metrics.incCacheRelease();
}
return response;
}
private String getCacheEntryFilePath(String checksum, String filename) {
return SharedCacheUtil.getCacheEntryPath(this.cacheDepth,
this.cacheRoot, checksum) + Path.SEPARATOR_CHAR + filename;
}
}

View File

@ -71,6 +71,9 @@ public class SharedCacheManager extends CompositeService {
createNMCacheUploaderSCMProtocolService(store);
addService(nms);
ClientProtocolService cps = createClientProtocolService(store);
addService(cps);
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
@ -106,6 +109,10 @@ public class SharedCacheManager extends CompositeService {
return new SharedCacheUploaderService(store);
}
private ClientProtocolService createClientProtocolService(SCMStore store) {
return new ClientProtocolService(store);
}
@Override
protected void serviceStop() throws Exception {

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* This class is for maintaining client requests metrics
* and publishing them through the metrics interfaces.
*/
@Private
@Unstable
@Metrics(about="Client SCM metrics", context="yarn")
public class ClientSCMMetrics {
private static final Log LOG = LogFactory.getLog(ClientSCMMetrics.class);
final MetricsRegistry registry;
ClientSCMMetrics() {
registry = new MetricsRegistry("clientRequests");
LOG.debug("Initialized " + registry);
}
enum Singleton {
INSTANCE;
ClientSCMMetrics impl;
synchronized ClientSCMMetrics init(Configuration conf) {
if (impl == null) {
impl = create();
}
return impl;
}
}
public static ClientSCMMetrics initSingleton(Configuration conf) {
return Singleton.INSTANCE.init(conf);
}
public static ClientSCMMetrics getInstance() {
ClientSCMMetrics topMetrics = Singleton.INSTANCE.impl;
if (topMetrics == null) {
throw new IllegalStateException(
"The ClientSCMMetrics singleton instance is not initialized."
+ " Have you called init first?");
}
return topMetrics;
}
static ClientSCMMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ClientSCMMetrics metrics = new ClientSCMMetrics();
ms.register("clientRequests", null, metrics);
return metrics;
}
@Metric("Number of cache hits") MutableCounterLong cacheHits;
@Metric("Number of cache misses") MutableCounterLong cacheMisses;
@Metric("Number of cache releases") MutableCounterLong cacheReleases;
/**
* One cache hit event
*/
public void incCacheHitCount() {
cacheHits.incr();
}
/**
* One cache miss event
*/
public void incCacheMissCount() {
cacheMisses.incr();
}
/**
* One cache release event
*/
public void incCacheRelease() {
cacheReleases.incr();
}
public long getCacheHits() { return cacheHits.value(); }
public long getCacheMisses() { return cacheMisses.value(); }
public long getCacheReleases() { return cacheReleases.value(); }
}

View File

@ -0,0 +1,278 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ClientSCMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic unit tests for the Client to SCM Protocol Service.
*/
public class TestClientSCMProtocolService {
private static File testDir = null;
@BeforeClass
public static void setupTestDirs() throws IOException {
testDir = new File("target",
TestSharedCacheUploaderService.class.getCanonicalName());
testDir.delete();
testDir.mkdirs();
testDir = testDir.getAbsoluteFile();
}
@AfterClass
public static void cleanupTestDirs() throws IOException {
if (testDir != null) {
testDir.delete();
}
}
private ClientProtocolService service;
private ClientSCMProtocol clientSCMProxy;
private SCMStore store;
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
@Before
public void startUp() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.SCM_STORE_CLASS,
InMemorySCMStore.class.getName());
conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
AppChecker appChecker = mock(AppChecker.class);
store = new InMemorySCMStore(appChecker);
store.init(conf);
store.start();
service = new ClientProtocolService(store);
service.init(conf);
service.start();
YarnRPC rpc = YarnRPC.create(new Configuration());
InetSocketAddress scmAddress =
conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
clientSCMProxy =
(ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class, scmAddress,
conf);
}
@After
public void cleanUp() {
if (store != null) {
store.stop();
store = null;
}
if (service != null) {
service.stop();
service = null;
}
if (clientSCMProxy != null) {
RPC.stopProxy(clientSCMProxy);
clientSCMProxy = null;
}
}
@Test
public void testUse_MissingEntry() throws Exception {
long misses = ClientSCMMetrics.getInstance().getCacheMisses();
UseSharedCacheResourceRequest request =
recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
request.setResourceKey("key1");
request.setAppId(createAppId(1, 1L));
assertNull(clientSCMProxy.use(request).getPath());
assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
.getInstance().getCacheMisses() - misses);
}
@Test
public void testUse_ExistingEntry_NoAppIds() throws Exception {
// Pre-populate the SCM with one cache entry
store.addResource("key1", "foo.jar");
long hits = ClientSCMMetrics.getInstance().getCacheHits();
UseSharedCacheResourceRequest request =
recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
request.setResourceKey("key1");
request.setAppId(createAppId(2, 2L));
// Expecting default depth of 3 and under the shared cache root dir
String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
assertEquals(1, store.getResourceReferences("key1").size());
assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
.getInstance().getCacheHits() - hits);
}
@Test
public void testUse_ExistingEntry_OneId() throws Exception {
// Pre-populate the SCM with one cache entry
store.addResource("key1", "foo.jar");
store.addResourceReference("key1",
new SharedCacheResourceReference(createAppId(1, 1L), "user"));
assertEquals(1, store.getResourceReferences("key1").size());
long hits = ClientSCMMetrics.getInstance().getCacheHits();
// Add a new distinct appId
UseSharedCacheResourceRequest request =
recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
request.setResourceKey("key1");
request.setAppId(createAppId(2, 2L));
// Expecting default depth of 3 under the shared cache root dir
String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
assertEquals(2, store.getResourceReferences("key1").size());
assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
.getInstance().getCacheHits() - hits);
}
@Test
public void testUse_ExistingEntry_DupId() throws Exception {
// Pre-populate the SCM with one cache entry
store.addResource("key1", "foo.jar");
UserGroupInformation testUGI = UserGroupInformation.getCurrentUser();
store.addResourceReference("key1",
new SharedCacheResourceReference(createAppId(1, 1L),
testUGI.getShortUserName()));
assertEquals(1, store.getResourceReferences("key1").size());
long hits = ClientSCMMetrics.getInstance().getCacheHits();
// Add a new duplicate appId
UseSharedCacheResourceRequest request =
recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
request.setResourceKey("key1");
request.setAppId(createAppId(1, 1L));
// Expecting default depth of 3 under the shared cache root dir
String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
assertEquals(1, store.getResourceReferences("key1").size());
assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
.getInstance().getCacheHits() - hits);
}
@Test
public void testRelease_ExistingEntry_NonExistantAppId() throws Exception {
// Pre-populate the SCM with one cache entry
store.addResource("key1", "foo.jar");
store.addResourceReference("key1",
new SharedCacheResourceReference(createAppId(1, 1L), "user"));
assertEquals(1, store.getResourceReferences("key1").size());
long releases = ClientSCMMetrics.getInstance().getCacheReleases();
ReleaseSharedCacheResourceRequest request =
recordFactory
.newRecordInstance(ReleaseSharedCacheResourceRequest.class);
request.setResourceKey("key1");
request.setAppId(createAppId(2, 2L));
clientSCMProxy.release(request);
assertEquals(1, store.getResourceReferences("key1").size());
assertEquals(
"Client SCM metrics were updated when a release did not happen", 0,
ClientSCMMetrics.getInstance().getCacheReleases() - releases);
}
@Test
public void testRelease_ExistingEntry_WithAppId() throws Exception {
// Pre-populate the SCM with one cache entry
store.addResource("key1", "foo.jar");
UserGroupInformation testUGI = UserGroupInformation.getCurrentUser();
store.addResourceReference("key1",
new SharedCacheResourceReference(createAppId(1, 1L),
testUGI.getShortUserName()));
assertEquals(1, store.getResourceReferences("key1").size());
long releases = ClientSCMMetrics.getInstance().getCacheReleases();
ReleaseSharedCacheResourceRequest request =
recordFactory
.newRecordInstance(ReleaseSharedCacheResourceRequest.class);
request.setResourceKey("key1");
request.setAppId(createAppId(1, 1L));
clientSCMProxy.release(request);
assertEquals(0, store.getResourceReferences("key1").size());
assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
.getInstance().getCacheReleases() - releases);
}
@Test
public void testRelease_MissingEntry() throws Exception {
long releases = ClientSCMMetrics.getInstance().getCacheReleases();
ReleaseSharedCacheResourceRequest request =
recordFactory
.newRecordInstance(ReleaseSharedCacheResourceRequest.class);
request.setResourceKey("key2");
request.setAppId(createAppId(2, 2L));
clientSCMProxy.release(request);
assertNotNull(store.getResourceReferences("key2"));
assertEquals(0, store.getResourceReferences("key2").size());
assertEquals(
"Client SCM metrics were updated when a release did not happen.", 0,
ClientSCMMetrics.getInstance().getCacheReleases() - releases);
}
private ApplicationId createAppId(int id, long timestamp) {
return ApplicationId.newInstance(timestamp, id);
}
}