diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 011083cb871..922b6fa9edc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -42,6 +42,9 @@ Release 2.7.0 - UNRELEASED YARN-2183. [YARN-1492] Cleaner service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) + YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. + (Chris Trezzo and Sangjin Lee via kasha) + IMPROVEMENTS YARN-1979. TestDirectoryCollection fails when the umask is unusual. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d8ed541dbe5..b459ee34e67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -1446,6 +1445,21 @@ public class YarnConfiguration extends Configuration { SCM_CLEANER_PREFIX + "resource-sleep-ms"; public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L; + /** The address of the node manager interface in the SCM. */ + public static final String SCM_UPLOADER_SERVER_ADDRESS = SHARED_CACHE_PREFIX + + "uploader.server.address"; + public static final int DEFAULT_SCM_UPLOADER_SERVER_PORT = 8046; + public static final String DEFAULT_SCM_UPLOADER_SERVER_ADDRESS = "0.0.0.0:" + + DEFAULT_SCM_UPLOADER_SERVER_PORT; + + /** + * The number of SCM threads used to handle notify requests from the node + * manager. + */ + public static final String SCM_UPLOADER_SERVER_THREAD_COUNT = + SHARED_CACHE_PREFIX + "uploader.server.thread-count"; + public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index d02931fdf0e..1e7d5443cab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1434,6 +1434,20 @@ 0 + + The address of the node manager interface in the SCM + (shared cache manager) + yarn.sharedcache.uploader.server.address + 0.0.0.0:8046 + + + + The number of threads used to handle shared cache manager + requests from the node manager (50 by default) + yarn.sharedcache.uploader.server.thread-count + 50 + + The interval that the yarn client library uses to poll the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index acf330fd1d8..35eacc53731 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -135,6 +135,7 @@ yarn_server_common_service_protos.proto yarn_server_common_service_protos.proto ResourceTracker.proto + SCMUploader.proto ${project.build.directory}/generated-sources/java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java new file mode 100644 index 00000000000..937f648510c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse; + +/** + *

+ * The protocol between a NodeManager's + * SharedCacheUploadService and the + * SharedCacheManager. + *

+ */ +@Private +@Unstable +public interface SCMUploaderProtocol { + /** + *

+ * The method used by the NodeManager's SharedCacheUploadService + * to notify the shared cache manager of a newly cached resource. + *

+ * + *

+ * The SharedCacheManager responds with whether or not the + * NodeManager should delete the uploaded file. + *

+ * + * @param request notify the shared cache manager of a newly uploaded resource + * to the shared cache + * @return response indicating if the newly uploaded resource should be + * deleted + * @throws YarnException + * @throws IOException + */ + public SCMUploaderNotifyResponse + notify(SCMUploaderNotifyRequest request) + throws YarnException, IOException; + + /** + *

+ * The method used by the NodeManager's SharedCacheUploadService + * to request whether a resource can be uploaded. + *

+ * + *

+ * The SharedCacheManager responds with whether or not the + * NodeManager can upload the file. + *

+ * + * @param request whether the resource can be uploaded to the shared cache + * @return response indicating if resource can be uploaded to the shared cache + * @throws YarnException + * @throws IOException + */ + public SCMUploaderCanUploadResponse + canUpload(SCMUploaderCanUploadRequest request) + throws YarnException, IOException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java new file mode 100644 index 00000000000..5099b7d6676 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.SCMUploaderProtocol.SCMUploaderProtocolService; + +@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB", + protocolVersion = 1) +public interface SCMUploaderProtocolPB extends + SCMUploaderProtocolService.BlockingInterface { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java new file mode 100644 index 00000000000..31e48688849 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl; + +import com.google.protobuf.ServiceException; + +public class SCMUploaderProtocolPBClientImpl implements + SCMUploaderProtocol, Closeable { + + private SCMUploaderProtocolPB proxy; + + public SCMUploaderProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, SCMUploaderProtocolPB.class, + ProtobufRpcEngine.class); + proxy = + RPC.getProxy(SCMUploaderProtocolPB.class, clientVersion, addr, conf); + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + this.proxy = null; + } + } + + @Override + public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request) + throws YarnException, IOException { + SCMUploaderNotifyRequestProto requestProto = + ((SCMUploaderNotifyRequestPBImpl) request).getProto(); + try { + return new SCMUploaderNotifyResponsePBImpl(proxy.notify(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public SCMUploaderCanUploadResponse canUpload( + SCMUploaderCanUploadRequest request) throws YarnException, IOException { + SCMUploaderCanUploadRequestProto requestProto = + ((SCMUploaderCanUploadRequestPBImpl)request).getProto(); + try { + return new SCMUploaderCanUploadResponsePBImpl(proxy.canUpload(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java new file mode 100644 index 00000000000..db6c58cf0e3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class SCMUploaderProtocolPBServiceImpl implements + SCMUploaderProtocolPB { + + private SCMUploaderProtocol real; + + public SCMUploaderProtocolPBServiceImpl(SCMUploaderProtocol impl) { + this.real = impl; + } + + @Override + public SCMUploaderNotifyResponseProto notify(RpcController controller, + SCMUploaderNotifyRequestProto proto) throws ServiceException { + SCMUploaderNotifyRequestPBImpl request = + new SCMUploaderNotifyRequestPBImpl(proto); + try { + SCMUploaderNotifyResponse response = real.notify(request); + return ((SCMUploaderNotifyResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SCMUploaderCanUploadResponseProto canUpload(RpcController controller, + SCMUploaderCanUploadRequestProto proto) + throws ServiceException { + SCMUploaderCanUploadRequestPBImpl request = + new SCMUploaderCanUploadRequestPBImpl(proto); + try { + SCMUploaderCanUploadResponse response = real.canUpload(request); + return ((SCMUploaderCanUploadResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java new file mode 100644 index 00000000000..bb6718a7724 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The request from the NodeManager to the SharedCacheManager that + * requests whether it can upload a resource in the shared cache. + *

+ */ +@Private +@Unstable +public abstract class SCMUploaderCanUploadRequest { + + /** + * Get the key of the resource that would be uploaded to the + * shared cache. + * + * @return key + */ + public abstract String getResourceKey(); + + /** + * Set the key of the resource that would be uploaded to the + * shared cache. + * + * @param key unique identifier for the resource + */ + public abstract void setResourceKey(String key); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java new file mode 100644 index 00000000000..5fb4626963e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The response from the SharedCacheManager to the NodeManager that indicates + * whether the NodeManager can upload the resource to the shared cache. If it is + * not accepted by SCM, the NodeManager should not upload it to the shared + * cache. + *

+ */ +@Private +@Unstable +public abstract class SCMUploaderCanUploadResponse { + + /** + * Get whether or not the node manager can upload the resource to the shared + * cache. + * + * @return boolean True if the resource can be uploaded, false otherwise. + */ + public abstract boolean getUploadable(); + + /** + * Set whether or not the node manager can upload the resource to the shared + * cache. + * + * @param b True if the resource can be uploaded, false otherwise. + */ + public abstract void setUploadable(boolean b); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java new file mode 100644 index 00000000000..c72453c568c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The request from the NodeManager to the SharedCacheManager that + * notifies that a resource has been uploaded to the shared cache. The + * SharedCacheManager may reject the resource for various reasons, + * in which case the NodeManager should remove it from the shared cache. + *

+ */ +@Private +@Unstable +public abstract class SCMUploaderNotifyRequest { + + /** + * Get the filename of the resource that was just uploaded to the shared + * cache. + * + * @return the filename + */ + public abstract String getFileName(); + + /** + * Set the filename of the resource that was just uploaded to the shared + * cache. + * + * @param filename the name of the file + */ + public abstract void setFilename(String filename); + + /** + * Get the key of the resource that was just uploaded to the + * shared cache. + * + * @return key + */ + public abstract String getResourceKey(); + + /** + * Set the key of the resource that was just uploaded to the + * shared cache. + * + * @param key unique identifier for the resource + */ + public abstract void setResourceKey(String key); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java new file mode 100644 index 00000000000..83d7913ddcf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The response from the SharedCacheManager to the NodeManager that indicates + * whether the NodeManager needs to delete the cached resource it was sending + * the notification for. + *

+ */ +@Private +@Unstable +public abstract class SCMUploaderNotifyResponse { + + /** + * Get whether or not the shared cache manager has accepted the notified + * resource (i.e. the uploaded file should remain in the cache). + * + * @return boolean True if the resource has been accepted, false otherwise. + */ + public abstract boolean getAccepted(); + + /** + * Set whether or not the shared cache manager has accepted the notified + * resource (i.e. the uploaded file should remain in the cache). + * + * @param b True if the resource has been accepted, false otherwise. + */ + public abstract void setAccepted(boolean b); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java new file mode 100644 index 00000000000..d350fb97dc6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest; + +public class SCMUploaderCanUploadRequestPBImpl + extends SCMUploaderCanUploadRequest { + SCMUploaderCanUploadRequestProto proto = + SCMUploaderCanUploadRequestProto.getDefaultInstance(); + SCMUploaderCanUploadRequestProto.Builder builder = null; + boolean viaProto = false; + + public SCMUploaderCanUploadRequestPBImpl() { + builder = SCMUploaderCanUploadRequestProto.newBuilder(); + } + + public SCMUploaderCanUploadRequestPBImpl( + SCMUploaderCanUploadRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SCMUploaderCanUploadRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getResourceKey() { + SCMUploaderCanUploadRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SCMUploaderCanUploadRequestProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java new file mode 100644 index 00000000000..3f44e2a009f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse; + +public class SCMUploaderCanUploadResponsePBImpl + extends SCMUploaderCanUploadResponse { + SCMUploaderCanUploadResponseProto proto = + SCMUploaderCanUploadResponseProto.getDefaultInstance(); + SCMUploaderCanUploadResponseProto.Builder builder = null; + boolean viaProto = false; + + public SCMUploaderCanUploadResponsePBImpl() { + builder = SCMUploaderCanUploadResponseProto.newBuilder(); + } + + public SCMUploaderCanUploadResponsePBImpl( + SCMUploaderCanUploadResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SCMUploaderCanUploadResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public boolean getUploadable() { + SCMUploaderCanUploadResponseProtoOrBuilder p = viaProto ? proto : builder; + // Default to true, when in doubt allow the upload + return (p.hasUploadable()) ? p.getUploadable() : true; + } + + @Override + public void setUploadable(boolean b) { + maybeInitBuilder(); + builder.setUploadable(b); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SCMUploaderCanUploadResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java new file mode 100644 index 00000000000..9b52b115c00 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; + +public class SCMUploaderNotifyRequestPBImpl extends SCMUploaderNotifyRequest { + SCMUploaderNotifyRequestProto proto = + SCMUploaderNotifyRequestProto.getDefaultInstance(); + SCMUploaderNotifyRequestProto.Builder builder = null; + boolean viaProto = false; + + public SCMUploaderNotifyRequestPBImpl() { + builder = SCMUploaderNotifyRequestProto.newBuilder(); + } + + public SCMUploaderNotifyRequestPBImpl( + SCMUploaderNotifyRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SCMUploaderNotifyRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getResourceKey() { + SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + @Override + public String getFileName() { + SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasFilename()) ? p.getFilename() : null; + } + + @Override + public void setFilename(String filename) { + maybeInitBuilder(); + if (filename == null) { + builder.clearFilename(); + return; + } + builder.setFilename(filename); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SCMUploaderNotifyRequestProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java new file mode 100644 index 00000000000..c899bbe6296 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse; + +public class SCMUploaderNotifyResponsePBImpl extends SCMUploaderNotifyResponse { + SCMUploaderNotifyResponseProto proto = + SCMUploaderNotifyResponseProto.getDefaultInstance(); + SCMUploaderNotifyResponseProto.Builder builder = null; + boolean viaProto = false; + + public SCMUploaderNotifyResponsePBImpl() { + builder = SCMUploaderNotifyResponseProto.newBuilder(); + } + + public SCMUploaderNotifyResponsePBImpl(SCMUploaderNotifyResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SCMUploaderNotifyResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public boolean getAccepted() { + SCMUploaderNotifyResponseProtoOrBuilder p = viaProto ? proto : builder; + // Default to true, when in doubt just leave the file in the cache + return (p.hasAccepted()) ? p.getAccepted() : true; + } + + @Override + public void setAccepted(boolean b) { + maybeInitBuilder(); + builder.setAccepted(b); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SCMUploaderNotifyResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto new file mode 100644 index 00000000000..227842269e1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "SCMUploaderProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_server_common_service_protos.proto"; + +service SCMUploaderProtocolService { + rpc notify(SCMUploaderNotifyRequestProto) returns (SCMUploaderNotifyResponseProto); + rpc canUpload(SCMUploaderCanUploadRequestProto) returns (SCMUploaderCanUploadResponseProto); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index f2d01ada027..91473c5a2de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -75,4 +75,21 @@ message NMContainerStatusProto { optional string diagnostics = 5 [default = "N/A"]; optional int32 container_exit_status = 6; optional int64 creation_time = 7; -} \ No newline at end of file +} + +message SCMUploaderNotifyRequestProto { + optional string resource_key = 1; + optional string filename = 2; +} + +message SCMUploaderNotifyResponseProto { + optional bool accepted = 1; +} + +message SCMUploaderCanUploadRequestProto { + optional string resource_key = 1; +} + +message SCMUploaderCanUploadResponseProto { + optional bool uploadable = 1; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 3fdb5882103..ab50727b8dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -67,6 +67,10 @@ public class SharedCacheManager extends CompositeService { CleanerService cs = createCleanerService(store); addService(cs); + SharedCacheUploaderService nms = + createNMCacheUploaderSCMProtocolService(store); + addService(nms); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -97,6 +101,11 @@ public class SharedCacheManager extends CompositeService { return new CleanerService(store); } + private SharedCacheUploaderService + createNMCacheUploaderSCMProtocolService(SCMStore store) { + return new SharedCacheUploaderService(store); + } + @Override protected void serviceStop() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java new file mode 100644 index 00000000000..f9494386b02 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +/** + * This service handles all rpc calls from the NodeManager uploader to the + * shared cache manager. + */ +public class SharedCacheUploaderService extends AbstractService + implements SCMUploaderProtocol { + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Server server; + InetSocketAddress bindAddress; + private final SCMStore store; + private SharedCacheUploaderMetrics metrics; + + public SharedCacheUploaderService(SCMStore store) { + super(SharedCacheUploaderService.class.getName()); + this.store = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.bindAddress = getBindAddress(conf); + + super.serviceInit(conf); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + this.metrics = SharedCacheUploaderMetrics.initSingleton(conf); + + YarnRPC rpc = YarnRPC.create(conf); + this.server = + rpc.getServer(SCMUploaderProtocol.class, this, bindAddress, + conf, null, // Secret manager null for now (security not supported) + conf.getInt(YarnConfiguration.SCM_UPLOADER_SERVER_THREAD_COUNT, + YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT)); + + // TODO (YARN-2774): Enable service authorization + + this.server.start(); + bindAddress = + conf.updateConnectAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS, + server.getListenerAddress()); + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + this.server = null; + } + + super.serviceStop(); + } + + @Override + public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request) + throws YarnException, IOException { + SCMUploaderNotifyResponse response = + recordFactory.newRecordInstance(SCMUploaderNotifyResponse.class); + + // TODO (YARN-2774): proper security/authorization needs to be implemented + + String filename = + store.addResource(request.getResourceKey(), request.getFileName()); + + boolean accepted = filename.equals(request.getFileName()); + + if (accepted) { + this.metrics.incAcceptedUploads(); + } else { + this.metrics.incRejectedUploads(); + } + + response.setAccepted(accepted); + + return response; + } + + @Override + public SCMUploaderCanUploadResponse canUpload( + SCMUploaderCanUploadRequest request) throws YarnException, IOException { + // TODO (YARN-2781): we may want to have a more flexible policy of + // instructing the node manager to upload only if it meets a certain + // criteria + // until then we return true for now + SCMUploaderCanUploadResponse response = + recordFactory.newRecordInstance(SCMUploaderCanUploadResponse.class); + response.setUploadable(true); + return response; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java new file mode 100644 index 00000000000..6fd816f29a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining shared cache uploader requests metrics + * and publishing them through the metrics interfaces. + */ +@Private +@Evolving +@Metrics(about="shared cache upload metrics", context="yarn") +public class SharedCacheUploaderMetrics { + + static final Log LOG = + LogFactory.getLog(SharedCacheUploaderMetrics.class); + final MetricsRegistry registry; + + SharedCacheUploaderMetrics() { + registry = new MetricsRegistry("SharedCacheUploaderRequests"); + LOG.debug("Initialized "+ registry); + } + + enum Singleton { + INSTANCE; + + SharedCacheUploaderMetrics impl; + + synchronized SharedCacheUploaderMetrics init(Configuration conf) { + if (impl == null) { + impl = create(); + } + return impl; + } + } + + public static SharedCacheUploaderMetrics + initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static SharedCacheUploaderMetrics getInstance() { + SharedCacheUploaderMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The SharedCacheUploaderMetrics singleton instance is not" + + "initialized. Have you called init first?"); + return topMetrics; + } + + static SharedCacheUploaderMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + SharedCacheUploaderMetrics metrics = + new SharedCacheUploaderMetrics(); + ms.register("SharedCacheUploaderRequests", null, metrics); + return metrics; + } + + @Metric("Number of accepted uploads") MutableCounterLong acceptedUploads; + @Metric("Number of rejected uploads") MutableCounterLong rejectedUploads; + + /** + * One accepted upload event + */ + public void incAcceptedUploads() { + acceptedUploads.incr(); + } + + /** + * One rejected upload event + */ + public void incRejectedUploads() { + rejectedUploads.incr(); + } + + public long getAcceptedUploads() { return acceptedUploads.value(); } + public long getRejectUploads() { return rejectedUploads.value(); } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java new file mode 100644 index 00000000000..1cb0663c4ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Basic unit tests for the NodeManger to SCM Protocol Service. + */ +public class TestSharedCacheUploaderService { + private static File testDir = null; + + @BeforeClass + public static void setupTestDirs() throws IOException { + testDir = new File("target", + TestSharedCacheUploaderService.class.getCanonicalName()); + testDir.delete(); + testDir.mkdirs(); + testDir = testDir.getAbsoluteFile(); + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testDir != null) { + testDir.delete(); + } + } + + private SharedCacheUploaderService service; + private SCMUploaderProtocol proxy; + private SCMStore store; + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @Before + public void startUp() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCM_STORE_CLASS, + InMemorySCMStore.class.getName()); + conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath()); + AppChecker appChecker = mock(AppChecker.class); + store = new InMemorySCMStore(appChecker); + store.init(conf); + store.start(); + + service = new SharedCacheUploaderService(store); + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(new Configuration()); + + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT); + + proxy = + (SCMUploaderProtocol) rpc.getProxy( + SCMUploaderProtocol.class, scmAddress, conf); + } + + @After + public void cleanUp() { + if (store != null) { + store.stop(); + } + + if (service != null) { + service.stop(); + } + + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + + @Test + public void testNotify_noEntry() throws Exception { + long accepted = + SharedCacheUploaderMetrics.getInstance().getAcceptedUploads(); + + SCMUploaderNotifyRequest request = + recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class); + request.setResourceKey("key1"); + request.setFilename("foo.jar"); + assertTrue(proxy.notify(request).getAccepted()); + Collection set = + store.getResourceReferences("key1"); + assertNotNull(set); + assertEquals(0, set.size()); + + assertEquals( + "NM upload metrics aren't updated.", 1, + SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() - + accepted); + + } + + @Test + public void testNotify_entryExists_differentName() throws Exception { + + long rejected = + SharedCacheUploaderMetrics.getInstance().getRejectUploads(); + + store.addResource("key1", "foo.jar"); + SCMUploaderNotifyRequest request = + recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class); + request.setResourceKey("key1"); + request.setFilename("foobar.jar"); + assertFalse(proxy.notify(request).getAccepted()); + Collection set = + store.getResourceReferences("key1"); + assertNotNull(set); + assertEquals(0, set.size()); + assertEquals( + "NM upload metrics aren't updated.", 1, + SharedCacheUploaderMetrics.getInstance().getRejectUploads() - + rejected); + + } + + @Test + public void testNotify_entryExists_sameName() throws Exception { + + long accepted = + SharedCacheUploaderMetrics.getInstance().getAcceptedUploads(); + + store.addResource("key1", "foo.jar"); + SCMUploaderNotifyRequest request = + recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class); + request.setResourceKey("key1"); + request.setFilename("foo.jar"); + assertTrue(proxy.notify(request).getAccepted()); + Collection set = + store.getResourceReferences("key1"); + assertNotNull(set); + assertEquals(0, set.size()); + assertEquals( + "NM upload metrics aren't updated.", 1, + SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() - + accepted); + + } +}