YARN-7244. ShuffleHandler is not aware of disks that are added. Contributed by Kuhu Shukla
This commit is contained in:
parent
2d4629f386
commit
9d3ea2c0ed
@ -57,7 +57,6 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
@ -83,7 +82,6 @@
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
@ -838,8 +836,6 @@ class Shuffle extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final Configuration conf;
|
||||
private final IndexCache indexCache;
|
||||
private final LocalDirAllocator lDirAlloc =
|
||||
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
|
||||
private int port;
|
||||
|
||||
public Shuffle(Configuration conf) {
|
||||
@ -1064,13 +1060,14 @@ private String getBaseLocation(String jobId, String user) {
|
||||
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
|
||||
int reduce, String user) throws IOException {
|
||||
// Index file
|
||||
Path indexFileName =
|
||||
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
|
||||
Path indexFileName = getAuxiliaryLocalPathHandler()
|
||||
.getLocalPathForRead(base + "/file.out.index");
|
||||
IndexRecord info =
|
||||
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
|
||||
|
||||
Path mapOutputFileName =
|
||||
lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
|
||||
getAuxiliaryLocalPathHandler().getLocalPathForRead(base +
|
||||
"/file.out");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
|
||||
}
|
||||
@ -1092,7 +1089,8 @@ protected void populateHeaders(List<String> mapIds, String jobId,
|
||||
}
|
||||
// Index file
|
||||
Path indexFileName =
|
||||
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
|
||||
getAuxiliaryLocalPathHandler().getLocalPathForRead(
|
||||
base + "/file.out.index");
|
||||
IndexRecord info =
|
||||
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
|
||||
ShuffleHeader header =
|
||||
|
@ -70,12 +70,15 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.ServiceStateException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.PureJavaCrc32;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
@ -92,6 +95,9 @@
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.mockito.Mockito;
|
||||
@ -100,8 +106,12 @@
|
||||
public class TestShuffleHandler {
|
||||
static final long MiB = 1024 * 1024;
|
||||
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
|
||||
private static final File ABS_LOG_DIR = GenericTestUtils.getTestDir(
|
||||
TestShuffleHandler.class.getSimpleName() + "LocDir");
|
||||
|
||||
class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler {
|
||||
private AuxiliaryLocalPathHandler pathHandler =
|
||||
new TestAuxiliaryLocalPathHandler();
|
||||
@Override
|
||||
protected Shuffle getShuffle(final Configuration conf) {
|
||||
return new Shuffle(conf) {
|
||||
@ -141,11 +151,35 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() {
|
||||
return pathHandler;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler {
|
||||
boolean socketKeepAlive = false;
|
||||
private class TestAuxiliaryLocalPathHandler
|
||||
implements AuxiliaryLocalPathHandler {
|
||||
@Override
|
||||
public Path getLocalPathForRead(String path) throws IOException {
|
||||
return new Path(ABS_LOG_DIR.getAbsolutePath(), path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getLocalPathForWrite(String path) throws IOException {
|
||||
return new Path(ABS_LOG_DIR.getAbsolutePath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getLocalPathForWrite(String path, long size)
|
||||
throws IOException {
|
||||
return new Path(ABS_LOG_DIR.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockShuffleHandler2 extends
|
||||
org.apache.hadoop.mapred.ShuffleHandler {
|
||||
boolean socketKeepAlive = false;
|
||||
@Override
|
||||
protected Shuffle getShuffle(final Configuration conf) {
|
||||
return new Shuffle(conf) {
|
||||
@ -479,6 +513,11 @@ public void testSocketKeepAlive() throws Exception {
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
|
||||
HttpURLConnection conn = null;
|
||||
MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
|
||||
AuxiliaryLocalPathHandler pathHandler =
|
||||
mock(AuxiliaryLocalPathHandler.class);
|
||||
when(pathHandler.getLocalPathForRead(anyString())).thenThrow(
|
||||
new DiskChecker.DiskErrorException("Test"));
|
||||
shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
try {
|
||||
shuffleHandler.init(conf);
|
||||
shuffleHandler.start();
|
||||
@ -661,19 +700,16 @@ public void testMapFileAccess() throws IOException {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
File absLogDir = new File("target",
|
||||
TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
|
||||
ApplicationId appId = ApplicationId.newInstance(12345, 1);
|
||||
LOG.info(appId.toString());
|
||||
String appAttemptId = "attempt_12345_1_m_1_0";
|
||||
String user = "randomUser";
|
||||
String reducerId = "0";
|
||||
List<File> fileMap = new ArrayList<File>();
|
||||
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
|
||||
createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId,
|
||||
conf, fileMap);
|
||||
ShuffleHandler shuffleHandler = new ShuffleHandler() {
|
||||
|
||||
@Override
|
||||
protected Shuffle getShuffle(Configuration conf) {
|
||||
// replace the shuffle handler with one stubbed for testing
|
||||
@ -689,6 +725,8 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
};
|
||||
}
|
||||
};
|
||||
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
|
||||
shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
shuffleHandler.init(conf);
|
||||
try {
|
||||
shuffleHandler.start();
|
||||
@ -733,7 +771,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
Assert.assertTrue((new String(byteArr)).contains(message));
|
||||
} finally {
|
||||
shuffleHandler.stop();
|
||||
FileUtil.fullyDelete(absLogDir);
|
||||
FileUtil.fullyDelete(ABS_LOG_DIR);
|
||||
}
|
||||
}
|
||||
|
||||
@ -794,10 +832,13 @@ public void testRecovery() throws IOException {
|
||||
final File tmpDir = new File(System.getProperty("test.build.data",
|
||||
System.getProperty("java.io.tmpdir")),
|
||||
TestShuffleHandler.class.getName());
|
||||
ShuffleHandler shuffle = new ShuffleHandler();
|
||||
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
|
||||
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
||||
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
|
||||
ShuffleHandler shuffle = new ShuffleHandler();
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
|
||||
// emulate aux services startup with recovery enabled
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
tmpDir.mkdirs();
|
||||
@ -823,6 +864,7 @@ public void testRecovery() throws IOException {
|
||||
// emulate shuffle handler restart
|
||||
shuffle.close();
|
||||
shuffle = new ShuffleHandler();
|
||||
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
shuffle.init(conf);
|
||||
shuffle.start();
|
||||
@ -865,6 +907,9 @@ public void testRecoveryFromOtherVersions() throws IOException {
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
||||
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
|
||||
ShuffleHandler shuffle = new ShuffleHandler();
|
||||
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
|
||||
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
|
||||
// emulate aux services startup with recovery enabled
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
tmpDir.mkdirs();
|
||||
@ -890,6 +935,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
|
||||
// emulate shuffle handler restart
|
||||
shuffle.close();
|
||||
shuffle = new ShuffleHandler();
|
||||
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
shuffle.init(conf);
|
||||
shuffle.start();
|
||||
@ -907,6 +953,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
|
||||
Assert.assertEquals(version11, shuffle.loadVersion());
|
||||
shuffle.close();
|
||||
shuffle = new ShuffleHandler();
|
||||
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
shuffle.init(conf);
|
||||
shuffle.start();
|
||||
@ -923,6 +970,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
|
||||
Assert.assertEquals(version21, shuffle.loadVersion());
|
||||
shuffle.close();
|
||||
shuffle = new ShuffleHandler();
|
||||
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
shuffle.init(conf);
|
||||
|
||||
@ -972,16 +1020,15 @@ public void testGetMapOutputInfo() throws Exception {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"simple");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
File absLogDir = new File("target", TestShuffleHandler.class.
|
||||
getSimpleName() + "LocDir").getAbsoluteFile();
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
|
||||
ApplicationId appId = ApplicationId.newInstance(12345, 1);
|
||||
String appAttemptId = "attempt_12345_1_m_1_0";
|
||||
String user = "randomUser";
|
||||
String reducerId = "0";
|
||||
List<File> fileMap = new ArrayList<File>();
|
||||
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
|
||||
createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId,
|
||||
conf, fileMap);
|
||||
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
|
||||
ShuffleHandler shuffleHandler = new ShuffleHandler() {
|
||||
@Override
|
||||
protected Shuffle getShuffle(Configuration conf) {
|
||||
@ -1025,6 +1072,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
};
|
||||
}
|
||||
};
|
||||
shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
|
||||
shuffleHandler.init(conf);
|
||||
try {
|
||||
shuffleHandler.start();
|
||||
@ -1063,7 +1111,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
0, failures.size());
|
||||
} finally {
|
||||
shuffleHandler.stop();
|
||||
FileUtil.fullyDelete(absLogDir);
|
||||
FileUtil.fullyDelete(ABS_LOG_DIR);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1073,10 +1121,10 @@ public void testSendMapCount() throws Exception {
|
||||
new ArrayList<ShuffleHandler.ReduceMapFileCount>();
|
||||
|
||||
final ChannelHandlerContext mockCtx =
|
||||
Mockito.mock(ChannelHandlerContext.class);
|
||||
final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
|
||||
final Channel mockCh = Mockito.mock(AbstractChannel.class);
|
||||
final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
|
||||
mock(ChannelHandlerContext.class);
|
||||
final MessageEvent mockEvt = mock(MessageEvent.class);
|
||||
final Channel mockCh = mock(AbstractChannel.class);
|
||||
final ChannelPipeline mockPipeline = mock(ChannelPipeline.class);
|
||||
|
||||
// Mock HttpRequest and ChannelFuture
|
||||
final HttpRequest mockHttpRequest = createMockHttpRequest();
|
||||
@ -1087,16 +1135,16 @@ public void testSendMapCount() throws Exception {
|
||||
|
||||
// Mock Netty Channel Context and Channel behavior
|
||||
Mockito.doReturn(mockCh).when(mockCtx).getChannel();
|
||||
Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
|
||||
Mockito.when(mockPipeline.get(
|
||||
when(mockCh.getPipeline()).thenReturn(mockPipeline);
|
||||
when(mockPipeline.get(
|
||||
Mockito.any(String.class))).thenReturn(timerHandler);
|
||||
Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
|
||||
when(mockCtx.getChannel()).thenReturn(mockCh);
|
||||
Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
|
||||
Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);
|
||||
when(mockCh.write(Object.class)).thenReturn(mockFuture);
|
||||
|
||||
//Mock MessageEvent behavior
|
||||
Mockito.doReturn(mockCh).when(mockEvt).getChannel();
|
||||
Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
|
||||
when(mockEvt.getChannel()).thenReturn(mockCh);
|
||||
Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
|
||||
|
||||
final ShuffleHandler sh = new MockShuffleHandler();
|
||||
@ -1120,8 +1168,8 @@ public void testSendMapCount() throws Exception {
|
||||
|
||||
public ChannelFuture createMockChannelFuture(Channel mockCh,
|
||||
final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
|
||||
final ChannelFuture mockFuture = Mockito.mock(ChannelFuture.class);
|
||||
Mockito.when(mockFuture.getChannel()).thenReturn(mockCh);
|
||||
final ChannelFuture mockFuture = mock(ChannelFuture.class);
|
||||
when(mockFuture.getChannel()).thenReturn(mockCh);
|
||||
Mockito.doReturn(true).when(mockFuture).isSuccess();
|
||||
Mockito.doAnswer(new Answer() {
|
||||
@Override
|
||||
@ -1139,7 +1187,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
}
|
||||
|
||||
public HttpRequest createMockHttpRequest() {
|
||||
HttpRequest mockHttpRequest = Mockito.mock(HttpRequest.class);
|
||||
HttpRequest mockHttpRequest = mock(HttpRequest.class);
|
||||
Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
|
||||
Mockito.doAnswer(new Answer() {
|
||||
@Override
|
||||
|
@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/** An Interface that can retrieve local directories to read from or write to.
|
||||
* Components can implement this interface to link it to
|
||||
* their own Directory Handler Service
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public interface AuxiliaryLocalPathHandler {
|
||||
/**
|
||||
* Get a path from the local FS for reading for a given Auxiliary Service.
|
||||
* @param path the requested path
|
||||
* @return the complete path to the file on a local disk
|
||||
* @throws IOException if the file read encounters a problem
|
||||
*/
|
||||
Path getLocalPathForRead(String path) throws IOException;
|
||||
|
||||
/**
|
||||
* Get a path from the local FS for writing for a given Auxiliary Service.
|
||||
* @param path the requested path
|
||||
* @return the complete path to the file on a local disk
|
||||
* @throws IOException if the path creations fails
|
||||
*/
|
||||
Path getLocalPathForWrite(String path) throws IOException;
|
||||
|
||||
/**
|
||||
* Get a path from the local FS for writing a file of an estimated size
|
||||
* for a given Auxiliary Service.
|
||||
* @param path the requested path
|
||||
* @param size the size of the file that is going to be written
|
||||
* @return the complete path to the file on a local disk
|
||||
* @throws IOException if the path creations fails
|
||||
*/
|
||||
Path getLocalPathForWrite(String path, long size) throws IOException;
|
||||
}
|
@ -40,6 +40,7 @@
|
||||
public abstract class AuxiliaryService extends AbstractService {
|
||||
|
||||
private Path recoveryPath = null;
|
||||
private AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
|
||||
|
||||
protected AuxiliaryService(String name) {
|
||||
super(name);
|
||||
@ -123,4 +124,24 @@ public void stopContainer(ContainerTerminationContext stopContainerContext) {
|
||||
public void setRecoveryPath(Path recoveryPath) {
|
||||
this.recoveryPath = recoveryPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that gets the local dirs path handler for this Auxiliary Service.
|
||||
*
|
||||
* @return auxiliaryPathHandler object that is used to read from and write to
|
||||
* valid local Dirs.
|
||||
*/
|
||||
public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() {
|
||||
return this.auxiliaryLocalPathHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that sets the local dirs path handler for this Auxiliary Service.
|
||||
*
|
||||
* @param auxiliaryLocalPathHandler the pathHandler for this auxiliary service
|
||||
*/
|
||||
public void setAuxiliaryLocalPathHandler(
|
||||
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) {
|
||||
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
|
||||
}
|
||||
}
|
||||
|
@ -551,6 +551,10 @@ public Path getLocalPathForWrite(String pathStr, long size,
|
||||
checkWrite);
|
||||
}
|
||||
|
||||
public Path getLocalPathForRead(String pathStr) throws IOException {
|
||||
return getPathToRead(pathStr, getLocalDirsForRead());
|
||||
}
|
||||
|
||||
public Path getLogPathForWrite(String pathStr, boolean checkWrite)
|
||||
throws IOException {
|
||||
return logDirsAllocator.getLocalPathForWrite(pathStr,
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||
@ -55,15 +56,17 @@ public class AuxServices extends AbstractService
|
||||
|
||||
protected final Map<String,AuxiliaryService> serviceMap;
|
||||
protected final Map<String,ByteBuffer> serviceMetaData;
|
||||
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
|
||||
|
||||
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
|
||||
|
||||
public AuxServices() {
|
||||
public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) {
|
||||
super(AuxServices.class.getName());
|
||||
serviceMap =
|
||||
Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
|
||||
serviceMetaData =
|
||||
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
|
||||
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
|
||||
// Obtain services from configuration in init()
|
||||
}
|
||||
|
||||
@ -134,6 +137,7 @@ public void serviceInit(Configuration conf) throws Exception {
|
||||
+"Service Meta Data may have issues unless the refer to "
|
||||
+"the name in the config.");
|
||||
}
|
||||
s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
|
||||
addService(sName, s);
|
||||
if (recoveryEnabled) {
|
||||
Path storePath = new Path(stateStoreRoot, sName);
|
||||
|
@ -42,6 +42,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
@ -97,6 +98,7 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||
@ -213,8 +215,10 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||
|
||||
this.nodeStatusUpdater = nodeStatusUpdater;
|
||||
|
||||
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
|
||||
new AuxiliaryLocalPathHandlerImpl(dirsHandler);
|
||||
// Start configurable services
|
||||
auxiliaryServices = new AuxServices();
|
||||
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler);
|
||||
auxiliaryServices.registerServiceListener(this);
|
||||
addService(auxiliaryServices);
|
||||
|
||||
@ -1326,6 +1330,35 @@ public void handle(ApplicationEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements AuxiliaryLocalPathHandler.
|
||||
* It links NodeManager's LocalDirsHandlerService to the Auxiliary Services
|
||||
*/
|
||||
static class AuxiliaryLocalPathHandlerImpl
|
||||
implements AuxiliaryLocalPathHandler {
|
||||
private LocalDirsHandlerService dirhandlerService;
|
||||
AuxiliaryLocalPathHandlerImpl(
|
||||
LocalDirsHandlerService dirhandlerService) {
|
||||
this.dirhandlerService = dirhandlerService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getLocalPathForRead(String path) throws IOException {
|
||||
return dirhandlerService.getLocalPathForRead(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getLocalPathForWrite(String path) throws IOException {
|
||||
return dirhandlerService.getLocalPathForWrite(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getLocalPathForWrite(String path, long size)
|
||||
throws IOException {
|
||||
return dirhandlerService.getLocalPathForWrite(path, size, false);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void handle(ContainerManagerEvent event) {
|
||||
|
@ -51,6 +51,7 @@
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||
@ -59,6 +60,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestAuxServices {
|
||||
private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
|
||||
@ -66,6 +68,8 @@ public class TestAuxServices {
|
||||
System.getProperty("test.build.data",
|
||||
System.getProperty("java.io.tmpdir")),
|
||||
TestAuxServices.class.getName());
|
||||
private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
|
||||
Mockito.mock(AuxiliaryLocalPathHandler.class);
|
||||
|
||||
static class LightService extends AuxiliaryService implements Service
|
||||
{
|
||||
@ -160,7 +164,7 @@ public void testAuxEventDispatch() {
|
||||
ServiceB.class, Service.class);
|
||||
conf.setInt("A.expected.init", 1);
|
||||
conf.setInt("B.expected.stop", 1);
|
||||
final AuxServices aux = new AuxServices();
|
||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
||||
aux.init(conf);
|
||||
aux.start();
|
||||
|
||||
@ -224,7 +228,7 @@ public void testAuxServices() {
|
||||
ServiceA.class, Service.class);
|
||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||
ServiceB.class, Service.class);
|
||||
final AuxServices aux = new AuxServices();
|
||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
||||
aux.init(conf);
|
||||
|
||||
int latch = 1;
|
||||
@ -236,8 +240,9 @@ public void testAuxServices() {
|
||||
}
|
||||
assertEquals("Invalid mix of services", 6, latch);
|
||||
aux.start();
|
||||
for (Service s : aux.getServices()) {
|
||||
for (AuxiliaryService s : aux.getServices()) {
|
||||
assertEquals(STARTED, s.getServiceState());
|
||||
assertEquals(s.getAuxiliaryLocalPathHandler(), MOCK_AUX_PATH_HANDLER);
|
||||
}
|
||||
|
||||
aux.stop();
|
||||
@ -255,7 +260,7 @@ public void testAuxServicesMeta() {
|
||||
ServiceA.class, Service.class);
|
||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||
ServiceB.class, Service.class);
|
||||
final AuxServices aux = new AuxServices();
|
||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
||||
aux.init(conf);
|
||||
|
||||
int latch = 1;
|
||||
@ -292,7 +297,7 @@ public void testAuxUnexpectedStop() {
|
||||
ServiceA.class, Service.class);
|
||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||
ServiceB.class, Service.class);
|
||||
final AuxServices aux = new AuxServices();
|
||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
||||
aux.init(conf);
|
||||
aux.start();
|
||||
|
||||
@ -305,7 +310,7 @@ public void testAuxUnexpectedStop() {
|
||||
|
||||
@Test
|
||||
public void testValidAuxServiceName() {
|
||||
final AuxServices aux = new AuxServices();
|
||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"});
|
||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
|
||||
@ -319,7 +324,7 @@ public void testValidAuxServiceName() {
|
||||
}
|
||||
|
||||
//Test bad auxService Name
|
||||
final AuxServices aux1 = new AuxServices();
|
||||
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"});
|
||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"),
|
||||
ServiceA.class, Service.class);
|
||||
@ -345,7 +350,7 @@ public void testAuxServiceRecoverySetup() throws IOException {
|
||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||
RecoverableServiceB.class, Service.class);
|
||||
try {
|
||||
final AuxServices aux = new AuxServices();
|
||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
||||
aux.init(conf);
|
||||
Assert.assertEquals(2, aux.getServices().size());
|
||||
File auxStorageDir = new File(TEST_DIR,
|
||||
|
@ -37,6 +37,7 @@
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||
@ -98,10 +99,15 @@
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestContainerManager extends BaseContainerManagerTest {
|
||||
|
||||
@ -265,6 +271,41 @@ public void testContainerSetup() throws Exception {
|
||||
Assert.assertEquals(null, reader.readLine());
|
||||
}
|
||||
|
||||
@Test (timeout = 10000L)
|
||||
public void testAuxPathHandler() throws Exception {
|
||||
File testDir = GenericTestUtils.getTestDir(GenericTestUtils.getTestDir(
|
||||
TestContainerManager.class.getSimpleName() + "LocDir").
|
||||
getAbsolutePath());
|
||||
testDir.mkdirs();
|
||||
File testFile = new File(testDir, "test");
|
||||
testFile.createNewFile();
|
||||
YarnConfiguration configuration = new YarnConfiguration();
|
||||
configuration.set(YarnConfiguration.NM_LOCAL_DIRS,
|
||||
testDir.getAbsolutePath());
|
||||
LocalDirsHandlerService spyDirHandlerService =
|
||||
Mockito.spy(new LocalDirsHandlerService());
|
||||
spyDirHandlerService.init(configuration);
|
||||
when(spyDirHandlerService.getConfig()).thenReturn(configuration);
|
||||
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
|
||||
new ContainerManagerImpl.AuxiliaryLocalPathHandlerImpl(
|
||||
spyDirHandlerService);
|
||||
Path p = auxiliaryLocalPathHandler.getLocalPathForRead("test");
|
||||
assertTrue(p != null &&
|
||||
!spyDirHandlerService.getLocalDirsForRead().isEmpty());
|
||||
|
||||
when(spyDirHandlerService.getLocalDirsForRead()).thenReturn(
|
||||
new ArrayList<String>());
|
||||
try {
|
||||
auxiliaryLocalPathHandler.getLocalPathForRead("test");
|
||||
fail("Should not have passed!");
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("Could not find"));
|
||||
} finally {
|
||||
testFile.delete();
|
||||
testDir.delete();
|
||||
}
|
||||
}
|
||||
|
||||
//@Test
|
||||
public void testContainerLaunchAndStop() throws IOException,
|
||||
InterruptedException, YarnException {
|
||||
@ -908,8 +949,8 @@ public void testNullTokens() throws Exception {
|
||||
|
||||
ContainerManagerImpl spyContainerMgr = Mockito.spy(cMgrImpl);
|
||||
UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a");
|
||||
Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo);
|
||||
Mockito.when(spyContainerMgr.
|
||||
when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo);
|
||||
when(spyContainerMgr.
|
||||
selectNMTokenIdentifier(ugInfo)).thenReturn(null);
|
||||
|
||||
strExceptionMsg = "";
|
||||
@ -1353,7 +1394,7 @@ public void testStartContainerFailureWithInvalidLocalResource()
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
ContainerLaunchContext spyContainerLaunchContext =
|
||||
Mockito.spy(containerLaunchContext);
|
||||
Mockito.when(spyContainerLaunchContext.getLocalResources())
|
||||
when(spyContainerLaunchContext.getLocalResources())
|
||||
.thenReturn(localResources);
|
||||
|
||||
ContainerId cId = createContainerId(0);
|
||||
@ -1398,7 +1439,7 @@ public void testStartContainerFailureWithNullTypeLocalResource()
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
ContainerLaunchContext spyContainerLaunchContext =
|
||||
Mockito.spy(containerLaunchContext);
|
||||
Mockito.when(spyContainerLaunchContext.getLocalResources())
|
||||
when(spyContainerLaunchContext.getLocalResources())
|
||||
.thenReturn(localResources);
|
||||
|
||||
ContainerId cId = createContainerId(0);
|
||||
@ -1443,7 +1484,7 @@ public void testStartContainerFailureWithNullVisibilityLocalResource()
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
ContainerLaunchContext spyContainerLaunchContext =
|
||||
Mockito.spy(containerLaunchContext);
|
||||
Mockito.when(spyContainerLaunchContext.getLocalResources())
|
||||
when(spyContainerLaunchContext.getLocalResources())
|
||||
.thenReturn(localResources);
|
||||
|
||||
ContainerId cId = createContainerId(0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user