YARN-7250. Update Shared cache client api to use URLs.
(cherry picked from commit c114da5e64
)
This commit is contained in:
parent
a6630f703c
commit
3d23522110
|
@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.SharedCacheClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.SharedCacheClientImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
|
@ -58,34 +59,25 @@ public abstract class SharedCacheClient extends AbstractService {
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* The <code>SharedCacheManager</code> responds with whether or not the
|
* The <code>SharedCacheManager</code> responds with whether or not the
|
||||||
* resource exists in the cache. If the resource exists, a <code>Path</code>
|
* resource exists in the cache. If the resource exists, a <code>URL</code> to
|
||||||
* to the resource in the shared cache is returned. If the resource does not
|
* the resource in the shared cache is returned. If the resource does not
|
||||||
* exist, null is returned instead.
|
* exist, null is returned instead.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* Once a path has been returned for a resource, that path is safe to use for
|
* Once a URL has been returned for a resource, that URL is safe to use for
|
||||||
* the lifetime of the application that corresponds to the provided
|
* the lifetime of the application that corresponds to the provided
|
||||||
* ApplicationId.
|
* ApplicationId.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
|
||||||
* Additionally, a name for the resource should be specified. A fragment will
|
|
||||||
* be added to the path with the desired name if the desired name is different
|
|
||||||
* than the name of the provided path from the shared cache. This ensures that
|
|
||||||
* if the returned path is used to create a LocalResource, then the symlink
|
|
||||||
* created during YARN localization will match the name specified.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @param applicationId ApplicationId of the application using the resource
|
* @param applicationId ApplicationId of the application using the resource
|
||||||
* @param resourceKey the key (i.e. checksum) that identifies the resource
|
* @param resourceKey the key (i.e. checksum) that identifies the resource
|
||||||
* @param resourceName the desired name of the resource
|
* @return URL to the resource, or null if it does not exist
|
||||||
* @return Path to the resource, or null if it does not exist
|
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract Path use(ApplicationId applicationId, String resourceKey,
|
public abstract URL use(ApplicationId applicationId, String resourceKey)
|
||||||
String resourceName) throws YarnException;
|
throws YarnException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
|
|
@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -38,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequ
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
|
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -113,8 +112,8 @@ public class SharedCacheClientImpl extends SharedCacheClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path use(ApplicationId applicationId, String resourceKey,
|
public URL use(ApplicationId applicationId, String resourceKey)
|
||||||
String resourceName) throws YarnException {
|
throws YarnException {
|
||||||
Path resourcePath = null;
|
Path resourcePath = null;
|
||||||
UseSharedCacheResourceRequest request = Records.newRecord(
|
UseSharedCacheResourceRequest request = Records.newRecord(
|
||||||
UseSharedCacheResourceRequest.class);
|
UseSharedCacheResourceRequest.class);
|
||||||
|
@ -132,31 +131,12 @@ public class SharedCacheClientImpl extends SharedCacheClient {
|
||||||
throw new YarnException(e);
|
throw new YarnException(e);
|
||||||
}
|
}
|
||||||
if (resourcePath != null) {
|
if (resourcePath != null) {
|
||||||
if (resourcePath.getName().equals(resourceName)) {
|
URL pathURL = URL.fromPath(resourcePath);
|
||||||
// The preferred name is the same as the name of the item in the cache,
|
return pathURL;
|
||||||
// so we skip generating the fragment to save space in the MRconfig.
|
} else {
|
||||||
return resourcePath;
|
// The resource was not in the cache.
|
||||||
} else {
|
return null;
|
||||||
// We are using the shared cache, and a preferred name has been
|
|
||||||
// specified that is different than the name of the resource in the
|
|
||||||
// shared cache. We need to set the fragment portion of the URI to
|
|
||||||
// preserve the desired name.
|
|
||||||
URI pathURI = resourcePath.toUri();
|
|
||||||
try {
|
|
||||||
// We assume that there is no existing fragment in the URI since the
|
|
||||||
// shared cache manager does not use fragments.
|
|
||||||
pathURI =
|
|
||||||
new URI(pathURI.getScheme(), pathURI.getSchemeSpecificPart(),
|
|
||||||
resourceName);
|
|
||||||
resourcePath = new Path(pathURI);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new YarnException(
|
|
||||||
"Could not create a new URI due to syntax errors: "
|
|
||||||
+ pathURI.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return resourcePath;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -40,6 +39,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -114,36 +114,21 @@ public class TestSharedCacheClientImpl {
|
||||||
response.setPath(null);
|
response.setPath(null);
|
||||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
||||||
response);
|
response);
|
||||||
Path newPath = client.use(mock(ApplicationId.class), "key", null);
|
URL newURL = client.use(mock(ApplicationId.class), "key");
|
||||||
assertNull("The path is not null!", newPath);
|
assertNull("The path is not null!", newURL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUseWithResourceName() throws Exception {
|
public void testUseCacheHit() throws Exception {
|
||||||
Path file = new Path("viewfs://test/path");
|
Path file = new Path("viewfs://test/path");
|
||||||
URI useUri = new URI("viewfs://test/path#linkName");
|
URL useUrl = URL.fromPath(new Path("viewfs://test/path"));
|
||||||
Path usePath = new Path(useUri);
|
|
||||||
UseSharedCacheResourceResponse response =
|
UseSharedCacheResourceResponse response =
|
||||||
new UseSharedCacheResourceResponsePBImpl();
|
new UseSharedCacheResourceResponsePBImpl();
|
||||||
response.setPath(file.toString());
|
response.setPath(file.toString());
|
||||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
||||||
response);
|
response);
|
||||||
Path newPath = client.use(mock(ApplicationId.class), "key", "linkName");
|
URL newURL = client.use(mock(ApplicationId.class), "key");
|
||||||
assertEquals("The paths are not equal!", usePath, newPath);
|
assertEquals("The paths are not equal!", useUrl, newURL);
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUseWithSameResourceName() throws Exception {
|
|
||||||
Path file = new Path("viewfs://test/path");
|
|
||||||
URI useUri = new URI("viewfs://test/path");
|
|
||||||
Path usePath = new Path(useUri);
|
|
||||||
UseSharedCacheResourceResponse response =
|
|
||||||
new UseSharedCacheResourceResponsePBImpl();
|
|
||||||
response.setPath(file.toString());
|
|
||||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
|
||||||
response);
|
|
||||||
Path newPath = client.use(mock(ApplicationId.class), "key", "path");
|
|
||||||
assertEquals("The paths are not equal!", usePath, newPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = YarnException.class)
|
@Test(expected = YarnException.class)
|
||||||
|
@ -151,7 +136,7 @@ public class TestSharedCacheClientImpl {
|
||||||
String message = "Mock IOExcepiton!";
|
String message = "Mock IOExcepiton!";
|
||||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow(
|
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow(
|
||||||
new IOException(message));
|
new IOException(message));
|
||||||
client.use(mock(ApplicationId.class), "key", null);
|
client.use(mock(ApplicationId.class), "key");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue