YARN-9557. Application fails in diskchecker when ReadWriteDiskValidator is configured. Contributed by Bilwa S T.
(cherry picked from commit 5f8395f393
)
This commit is contained in:
parent
a59553b5e6
commit
5effeae1f3
|
@ -132,7 +132,7 @@ public class ContainerLocalizer {
|
||||||
this.localDirs = localDirs;
|
this.localDirs = localDirs;
|
||||||
this.localizerId = localizerId;
|
this.localizerId = localizerId;
|
||||||
this.recordFactory = recordFactory;
|
this.recordFactory = recordFactory;
|
||||||
this.conf = new YarnConfiguration();
|
this.conf = initConfiguration();
|
||||||
this.diskValidator = DiskValidatorFactory.getInstance(
|
this.diskValidator = DiskValidatorFactory.getInstance(
|
||||||
conf.get(YarnConfiguration.DISK_VALIDATOR,
|
conf.get(YarnConfiguration.DISK_VALIDATOR,
|
||||||
YarnConfiguration.DEFAULT_DISK_VALIDATOR));
|
YarnConfiguration.DEFAULT_DISK_VALIDATOR));
|
||||||
|
@ -142,6 +142,12 @@ public class ContainerLocalizer {
|
||||||
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
|
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
@Private
|
||||||
|
Configuration initConfiguration() {
|
||||||
|
return new YarnConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
|
public LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
|
||||||
|
@ -251,7 +257,8 @@ public class ContainerLocalizer {
|
||||||
if (rsrc.getVisibility() == LocalResourceVisibility.PRIVATE) {
|
if (rsrc.getVisibility() == LocalResourceVisibility.PRIVATE) {
|
||||||
createParentDirs(destDirPath);
|
createParentDirs(destDirPath);
|
||||||
}
|
}
|
||||||
diskValidator.checkStatus(new File(destDirPath.toUri().getRawPath()));
|
diskValidator
|
||||||
|
.checkStatus(new File(destDirPath.getParent().toUri().getRawPath()));
|
||||||
return new FSDownloadWrapper(lfs, ugi, conf, destDirPath, rsrc);
|
return new FSDownloadWrapper(lfs, ugi, conf, destDirPath, rsrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,12 +68,14 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -239,6 +241,32 @@ public class TestContainerLocalizer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiskCheckFailure() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.DISK_VALIDATOR, "read-write");
|
||||||
|
FileContext lfs = FileContext.getLocalFSFileContext(conf);
|
||||||
|
Path fileCacheDir = lfs.makeQualified(new Path(basedir, "filecache"));
|
||||||
|
lfs.mkdir(fileCacheDir, FsPermission.getDefault(), true);
|
||||||
|
RecordFactory recordFactory = mock(RecordFactory.class);
|
||||||
|
ContainerLocalizer localizer = new ContainerLocalizer(lfs,
|
||||||
|
UserGroupInformation.getCurrentUser().getUserName(), "application_01",
|
||||||
|
"container_01", new ArrayList<>(), recordFactory) {
|
||||||
|
@Override
|
||||||
|
Configuration initConfiguration() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
LocalResource rsrc = mock(LocalResource.class);
|
||||||
|
Path destDirPath = new Path(fileCacheDir, "11");
|
||||||
|
try {
|
||||||
|
localizer.download(destDirPath, rsrc,
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
} catch (DiskErrorException ex) {
|
||||||
|
fail(ex.getCause().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue