Fixed more FilesSystemRMStateStore issues. Contributed by Vinod Kumar Vavilapalli.
(cherry picked from commit 9acd24fec4
)
This commit is contained in:
parent
33648268ce
commit
148412bb30
|
@ -268,6 +268,18 @@ public class YarnConfiguration extends Configuration {
|
||||||
/** ACL used in case none is found. Allows nothing. */
|
/** ACL used in case none is found. Allows nothing. */
|
||||||
public static final String DEFAULT_YARN_APP_ACL = " ";
|
public static final String DEFAULT_YARN_APP_ACL = " ";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable/disable intermediate-data encryption at YARN level. For now, this
|
||||||
|
* only is used by the FileSystemRMStateStore to setup right file-system
|
||||||
|
* security attributes.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public static final String YARN_INTERMEDIATE_DATA_ENCRYPTION = YARN_PREFIX
|
||||||
|
+ "intermediate-data-encryption.enable";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final Boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false;
|
||||||
|
|
||||||
/** The address of the RM admin interface.*/
|
/** The address of the RM admin interface.*/
|
||||||
public static final String RM_ADMIN_ADDRESS =
|
public static final String RM_ADMIN_ADDRESS =
|
||||||
RM_PREFIX + "admin.address";
|
RM_PREFIX + "admin.address";
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -100,7 +101,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
private Path dtSequenceNumberPath = null;
|
private Path dtSequenceNumberPath = null;
|
||||||
private int fsNumRetries;
|
private int fsNumRetries;
|
||||||
private long fsRetryInterval;
|
private long fsRetryInterval;
|
||||||
private volatile boolean isHDFS;
|
private boolean intermediateEncryptionEnabled =
|
||||||
|
YarnConfiguration.DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Path fsWorkingPath;
|
Path fsWorkingPath;
|
||||||
|
@ -121,6 +123,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
fsRetryInterval =
|
fsRetryInterval =
|
||||||
conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
|
||||||
|
intermediateEncryptionEnabled =
|
||||||
|
conf.getBoolean(YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION,
|
||||||
|
YarnConfiguration.DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -145,17 +150,11 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
fs = fsWorkingPath.getFileSystem(fsConf);
|
fs = fsWorkingPath.getFileSystem(fsConf);
|
||||||
isHDFS = fs.getScheme().toLowerCase().contains("hdfs");
|
|
||||||
mkdirsWithRetries(rmDTSecretManagerRoot);
|
mkdirsWithRetries(rmDTSecretManagerRoot);
|
||||||
mkdirsWithRetries(rmAppRoot);
|
mkdirsWithRetries(rmAppRoot);
|
||||||
mkdirsWithRetries(amrmTokenSecretManagerRoot);
|
mkdirsWithRetries(amrmTokenSecretManagerRoot);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
void setIsHDFS(boolean isHDFS) {
|
|
||||||
this.isHDFS = isHDFS;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void closeInternal() throws Exception {
|
protected synchronized void closeInternal() throws Exception {
|
||||||
closeWithRetries();
|
closeWithRetries();
|
||||||
|
@ -856,8 +855,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
private void setUnreadableBySuperuserXattrib(Path p)
|
private void setUnreadableBySuperuserXattrib(Path p)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (isHDFS &&
|
if (fs.getScheme().toLowerCase().contains("hdfs")
|
||||||
!fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
|
&& intermediateEncryptionEnabled
|
||||||
|
&& !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
|
||||||
fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null,
|
fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null,
|
||||||
EnumSet.of(XAttrSetFlag.CREATE));
|
EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,11 +26,6 @@ import java.util.LinkedList;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -38,17 +33,22 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestFSRMStateStore extends RMStateStoreTestBase {
|
public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
|
@ -111,6 +111,10 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
|
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
|
||||||
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
||||||
900L);
|
900L);
|
||||||
|
if (adminCheckEnable) {
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);
|
||||||
|
}
|
||||||
this.store = new TestFileSystemRMStore(conf);
|
this.store = new TestFileSystemRMStore(conf);
|
||||||
Assert.assertEquals(store.getNumRetries(), 8);
|
Assert.assertEquals(store.getNumRetries(), 8);
|
||||||
Assert.assertEquals(store.getRetryInterval(), 900L);
|
Assert.assertEquals(store.getRetryInterval(), 900L);
|
||||||
|
@ -119,11 +123,6 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
store.startInternal();
|
store.startInternal();
|
||||||
Assert.assertTrue(store.fs != previousFs);
|
Assert.assertTrue(store.fs != previousFs);
|
||||||
Assert.assertTrue(store.fs.getConf() == store.fsConf);
|
Assert.assertTrue(store.fs.getConf() == store.fsConf);
|
||||||
if (adminCheckEnable) {
|
|
||||||
store.setIsHDFS(true);
|
|
||||||
} else {
|
|
||||||
store.setIsHDFS(false);
|
|
||||||
}
|
|
||||||
return store;
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue