diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9073cb43422..6ab31345228 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -41,6 +41,9 @@ Release 2.7.0 - UNRELEASED YARN-2427. Added the API of moving apps between queues in RM web services. (Varun Vasudev via zjshen) + YARN-2217. [YARN-1492] Shared cache client side changes. + (Chris Trezzo via kasha) + IMPROVEMENTS YARN-2950. Change message to mandate, not suggest JS requirement on UI. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java new file mode 100644 index 00000000000..7cbe0e164b5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.impl.SharedCacheClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This is the client for YARN's shared cache. + */ +@Public +@Unstable +public abstract class SharedCacheClient extends AbstractService { + + @Public + public static SharedCacheClient createSharedCacheClient() { + SharedCacheClient client = new SharedCacheClientImpl(); + return client; + } + + @Private + public SharedCacheClient(String name) { + super(name); + } + + /** + *

+ * The method to claim a resource with the SharedCacheManager. + * The client uses a checksum to identify the resource and an + * {@link ApplicationId} to identify which application will be using the + * resource. + *

+ * + *

+ * The SharedCacheManager responds with whether or not the + * resource exists in the cache. If the resource exists, a Path + * to the resource in the shared cache is returned. If the resource does not + * exist, null is returned instead. + *

+ * + * @param applicationId ApplicationId of the application using the resource + * @param resourceKey the key (i.e. checksum) that identifies the resource + * @return Path to the resource, or null if it does not exist + */ + @Public + @Unstable + public abstract Path use(ApplicationId applicationId, String resourceKey) + throws YarnException; + + /** + *

+ * The method to release a resource with the SharedCacheManager. + * This method is called once an application is no longer using a claimed + * resource in the shared cache. The client uses a checksum to identify the + * resource and an {@link ApplicationId} to identify which application is + * releasing the resource. + *

+ * + *

+ * Note: This method is an optimization and the client is not required to call + * it for correctness. + *

+ * + * @param applicationId ApplicationId of the application releasing the + * resource + * @param resourceKey the key (i.e. checksum) that identifies the resource + */ + @Public + @Unstable + public abstract void release(ApplicationId applicationId, String resourceKey) + throws YarnException; + + /** + * A convenience method to calculate the checksum of a specified file. + * + * @param sourceFile A path to the input file + * @return A hex string containing the checksum digest + * @throws IOException + */ + @Public + @Unstable + public abstract String getFileChecksum(Path sourceFile) throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java new file mode 100644 index 00000000000..0a61ee03c8b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.SharedCacheClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum; +import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory; +import org.apache.hadoop.yarn.util.Records; + +import com.google.common.annotations.VisibleForTesting; + +/** + * An implementation of the SharedCacheClient API. + */ +@Private +@Unstable +public class SharedCacheClientImpl extends SharedCacheClient { + private static final Log LOG = LogFactory + .getLog(SharedCacheClientImpl.class); + + private ClientSCMProtocol scmClient; + private InetSocketAddress scmAddress; + private Configuration conf; + private SharedCacheChecksum checksum; + + public SharedCacheClientImpl() { + super(SharedCacheClientImpl.class.getName()); + } + + private static InetSocketAddress getScmAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (this.scmAddress == null) { + this.scmAddress = getScmAddress(conf); + } + this.conf = conf; + this.checksum = SharedCacheChecksumFactory.getChecksum(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.scmClient = createClientProxy(); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to Shared Cache Manager at " + this.scmAddress); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + stopClientProxy(); + super.serviceStop(); + } + + @VisibleForTesting + protected ClientSCMProtocol createClientProxy() { + YarnRPC rpc = YarnRPC.create(getConfig()); + return (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class, + this.scmAddress, getConfig()); + } + + @VisibleForTesting + protected void stopClientProxy() { + if (this.scmClient != null) { + RPC.stopProxy(this.scmClient); + this.scmClient = null; + } + } + + @Override + public Path use(ApplicationId applicationId, String resourceKey) + throws YarnException { + Path resourcePath = null; + UseSharedCacheResourceRequest request = Records.newRecord( + UseSharedCacheResourceRequest.class); + request.setAppId(applicationId); + request.setResourceKey(resourceKey); + try { + UseSharedCacheResourceResponse response = this.scmClient.use(request); + if (response != null && response.getPath() != null) { + resourcePath = new Path(response.getPath()); + } + } catch (Exception e) { + // Just catching IOException isn't enough. + // RPC call can throw ConnectionException. + // We don't handle different exceptions separately at this point. + throw new YarnException(e); + } + return resourcePath; + } + + @Override + public void release(ApplicationId applicationId, String resourceKey) + throws YarnException { + ReleaseSharedCacheResourceRequest request = Records.newRecord( + ReleaseSharedCacheResourceRequest.class); + request.setAppId(applicationId); + request.setResourceKey(resourceKey); + try { + // We do not care about the response because it is empty. + this.scmClient.release(request); + } catch (Exception e) { + // Just catching IOException isn't enough. + // RPC call can throw ConnectionException. + throw new YarnException(e); + } + } + + @Override + public String getFileChecksum(Path sourceFile) + throws IOException { + FileSystem fs = sourceFile.getFileSystem(this.conf); + FSDataInputStream in = null; + try { + in = fs.open(sourceFile); + return this.checksum.computeChecksum(in); + } finally { + if (in != null) { + in.close(); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java new file mode 100644 index 00000000000..3985e544476 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSharedCacheClientImpl { + + private static final Log LOG = LogFactory + .getLog(TestSharedCacheClientImpl.class); + + public static SharedCacheClientImpl client; + public static ClientSCMProtocol cProtocol; + private static Path TEST_ROOT_DIR; + private static FileSystem localFs; + private static String input = "This is a test file."; + private static String inputChecksumSHA256 = + "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de"; + + @BeforeClass + public static void beforeClass() throws IOException { + localFs = FileSystem.getLocal(new Configuration()); + TEST_ROOT_DIR = + new Path("target", TestSharedCacheClientImpl.class.getName() + + "-tmpDir").makeQualified(localFs.getUri(), + localFs.getWorkingDirectory()); + } + + @AfterClass + public static void afterClass() { + try { + if (localFs != null) { + localFs.close(); + } + } catch (IOException ioe) { + LOG.info("IO exception in closing file system)"); + ioe.printStackTrace(); + } + } + + @Before + public void setup() { + cProtocol = mock(ClientSCMProtocol.class); + client = new SharedCacheClientImpl() { + @Override + protected ClientSCMProtocol createClientProxy() { + return cProtocol; + } + + @Override + protected void stopClientProxy() { + // do nothing because it is mocked + } + }; + client.init(new Configuration()); + client.start(); + } + + @After + public void cleanup() { + if (client != null) { + client.stop(); + client = null; + } + } + + @Test + public void testUse() throws Exception { + Path file = new Path("viewfs://test/path"); + UseSharedCacheResourceResponse response = + new UseSharedCacheResourceResponsePBImpl(); + response.setPath(file.toString()); + when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn( + response); + Path newPath = client.use(mock(ApplicationId.class), "key"); + assertEquals(file, newPath); + } + + @Test(expected = YarnException.class) + public void testUseError() throws Exception { + String message = "Mock IOExcepiton!"; + when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow( + new IOException(message)); + client.use(mock(ApplicationId.class), "key"); + } + + @Test + public void testRelease() throws Exception { + // Release does not care about the return value because it is empty + when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class))) + .thenReturn(null); + client.release(mock(ApplicationId.class), "key"); + } + + @Test(expected = YarnException.class) + public void testReleaseError() throws Exception { + String message = "Mock IOExcepiton!"; + when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class))) + .thenThrow(new IOException(message)); + client.release(mock(ApplicationId.class), "key"); + } + + @Test + public void testChecksum() throws Exception { + String filename = "test1.txt"; + Path file = makeFile(filename); + assertEquals(inputChecksumSHA256, client.getFileChecksum(file)); + } + + @Test(expected = FileNotFoundException.class) + public void testNonexistantFileChecksum() throws Exception { + Path file = new Path(TEST_ROOT_DIR, "non-existant-file"); + client.getFileChecksum(file); + } + + private Path makeFile(String filename) throws Exception { + Path file = new Path(TEST_ROOT_DIR, filename); + DataOutputStream out = null; + try { + out = localFs.create(file); + out.write(input.getBytes("UTF-8")); + } finally { + if(out != null) { + out.close(); + } + } + return file; + } +}