Merge trunk into branch
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1613007 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
5149a8a6f1
|
@ -452,6 +452,11 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
HADOOP-10855. Allow Text to be read with a known Length. (todd)
|
||||
|
||||
HADOOP-10887. Add XAttrs to ViewFs and make XAttrs + ViewFileSystem
|
||||
internal dir behavior consistent. (Stephen Chu via wang)
|
||||
|
||||
HADOOP-10882. Move DirectBufferPool into common util. (todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -480,6 +485,9 @@ Release 2.6.0 - UNRELEASED
|
|||
command when the format of the stat command uses non-curly quotes (yzhang
|
||||
via cmccabe)
|
||||
|
||||
HADOOP-10830. Missing lock in JavaKeyStoreProvider.createCredentialEntry.
|
||||
(Benoy Antony via umamahesh)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -805,6 +813,12 @@ Release 2.5.0 - UNRELEASED
|
|||
HADOOP-10864. Tool documentenation is broken. (Akira Ajisaka
|
||||
via Arpit Agarwal)
|
||||
|
||||
HADOOP-10872. TestPathData fails intermittently with "Mkdirs failed
|
||||
to create d1". (Yongjun Zhang via Arpit Agarwal)
|
||||
|
||||
HADOOP-10890. TestDFVariations.testMount fails intermittently. (Yongjun
|
||||
Zhang via Arpit Agarwal)
|
||||
|
||||
Release 2.4.1 - 2014-06-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -437,7 +437,9 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
|
|||
throw new FileNotFoundException("Parent directory doesn't exist: "
|
||||
+ parent);
|
||||
} else if (!mkdirs(parent)) {
|
||||
throw new IOException("Mkdirs failed to create " + parent);
|
||||
throw new IOException("Mkdirs failed to create " + parent
|
||||
+ " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
|
||||
+ ")");
|
||||
}
|
||||
}
|
||||
final FSDataOutputStream out;
|
||||
|
|
|
@ -2484,4 +2484,33 @@ public final class FileContext {
|
|||
}
|
||||
}.resolve(this, absF);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all of the xattr names for a file or directory.
|
||||
* Only those xattr names which the logged-in user has permissions to view
|
||||
* are returned.
|
||||
* <p/>
|
||||
* A regular user can only get xattr names for the "user" namespace.
|
||||
* The super user can only get xattr names for "user" and "trusted"
|
||||
* namespaces.
|
||||
* The xattrs of the "security" and "system" namespaces are only
|
||||
* used/exposed internally by/to the FS impl.
|
||||
* <p/>
|
||||
* @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
|
||||
* http://en.wikipedia.org/wiki/Extended_file_attributes</a>
|
||||
*
|
||||
* @param path Path to get extended attributes
|
||||
* @return List<String> of the XAttr names of the file or directory
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<String> listXAttrs(Path path) throws IOException {
|
||||
final Path absF = fixRelativePart(path);
|
||||
return new FSLinkResolver<List<String>>() {
|
||||
@Override
|
||||
public List<String> next(final AbstractFileSystem fs, final Path p)
|
||||
throws IOException {
|
||||
return fs.listXAttrs(p);
|
||||
}
|
||||
}.resolve(this, absF);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2509,7 +2509,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
* http://en.wikipedia.org/wiki/Extended_file_attributes</a>
|
||||
*
|
||||
* @param path Path to get extended attributes
|
||||
* @return Map<String, byte[]> describing the XAttrs of the file or directory
|
||||
* @return List<String> of the XAttr names of the file or directory
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<String> listXAttrs(Path path) throws IOException {
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FsStatus;
|
|||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -313,6 +315,38 @@ class ChRootedFs extends AbstractFileSystem {
|
|||
return myFs.getAclStatus(fullPath(path));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setXAttr(Path path, String name, byte[] value,
|
||||
EnumSet<XAttrSetFlag> flag) throws IOException {
|
||||
myFs.setXAttr(fullPath(path), name, value, flag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getXAttr(Path path, String name) throws IOException {
|
||||
return myFs.getXAttr(fullPath(path), name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
|
||||
return myFs.getXAttrs(fullPath(path));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
|
||||
throws IOException {
|
||||
return myFs.getXAttrs(fullPath(path), names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listXAttrs(Path path) throws IOException {
|
||||
return myFs.listXAttrs(fullPath(path));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeXAttr(Path path, String name) throws IOException {
|
||||
myFs.removeXAttr(fullPath(path), name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVerifyChecksum(final boolean verifyChecksum)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
|
|
|
@ -913,5 +913,39 @@ public class ViewFileSystem extends FileSystem {
|
|||
.addEntries(AclUtil.getMinimalAcl(PERMISSION_555))
|
||||
.stickyBit(false).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setXAttr(Path path, String name, byte[] value,
|
||||
EnumSet<XAttrSetFlag> flag) throws IOException {
|
||||
checkPathIsSlash(path);
|
||||
throw readOnlyMountTable("setXAttr", path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getXAttr(Path path, String name) throws IOException {
|
||||
throw new NotInMountpointException(path, "getXAttr");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
|
||||
throw new NotInMountpointException(path, "getXAttrs");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
|
||||
throws IOException {
|
||||
throw new NotInMountpointException(path, "getXAttrs");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listXAttrs(Path path) throws IOException {
|
||||
throw new NotInMountpointException(path, "listXAttrs");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeXAttr(Path path, String name) throws IOException {
|
||||
checkPathIsSlash(path);
|
||||
throw readOnlyMountTable("removeXAttr", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.net.URISyntaxException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -48,6 +49,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||
import org.apache.hadoop.fs.local.LocalConfigKeys;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
|
@ -652,6 +654,50 @@ public class ViewFs extends AbstractFileSystem {
|
|||
return res.targetFileSystem.getAclStatus(res.remainingPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setXAttr(Path path, String name, byte[] value,
|
||||
EnumSet<XAttrSetFlag> flag) throws IOException {
|
||||
InodeTree.ResolveResult<AbstractFileSystem> res =
|
||||
fsState.resolve(getUriPath(path), true);
|
||||
res.targetFileSystem.setXAttr(res.remainingPath, name, value, flag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getXAttr(Path path, String name) throws IOException {
|
||||
InodeTree.ResolveResult<AbstractFileSystem> res =
|
||||
fsState.resolve(getUriPath(path), true);
|
||||
return res.targetFileSystem.getXAttr(res.remainingPath, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
|
||||
InodeTree.ResolveResult<AbstractFileSystem> res =
|
||||
fsState.resolve(getUriPath(path), true);
|
||||
return res.targetFileSystem.getXAttrs(res.remainingPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
|
||||
throws IOException {
|
||||
InodeTree.ResolveResult<AbstractFileSystem> res =
|
||||
fsState.resolve(getUriPath(path), true);
|
||||
return res.targetFileSystem.getXAttrs(res.remainingPath, names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listXAttrs(Path path) throws IOException {
|
||||
InodeTree.ResolveResult<AbstractFileSystem> res =
|
||||
fsState.resolve(getUriPath(path), true);
|
||||
return res.targetFileSystem.listXAttrs(res.remainingPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeXAttr(Path path, String name) throws IOException {
|
||||
InodeTree.ResolveResult<AbstractFileSystem> res =
|
||||
fsState.resolve(getUriPath(path), true);
|
||||
res.targetFileSystem.removeXAttr(res.remainingPath, name);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* An instance of this class represents an internal dir of the viewFs
|
||||
|
@ -921,5 +967,39 @@ public class ViewFs extends AbstractFileSystem {
|
|||
.addEntries(AclUtil.getMinimalAcl(PERMISSION_555))
|
||||
.stickyBit(false).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setXAttr(Path path, String name, byte[] value,
|
||||
EnumSet<XAttrSetFlag> flag) throws IOException {
|
||||
checkPathIsSlash(path);
|
||||
throw readOnlyMountTable("setXAttr", path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getXAttr(Path path, String name) throws IOException {
|
||||
throw new NotInMountpointException(path, "getXAttr");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
|
||||
throw new NotInMountpointException(path, "getXAttrs");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
|
||||
throws IOException {
|
||||
throw new NotInMountpointException(path, "getXAttrs");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listXAttrs(Path path) throws IOException {
|
||||
throw new NotInMountpointException(path, "listXAttrs");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeXAttr(Path path, String name) throws IOException {
|
||||
checkPathIsSlash(path);
|
||||
throw readOnlyMountTable("removeXAttr", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -194,15 +194,18 @@ public class JavaKeyStoreProvider extends CredentialProvider {
|
|||
@Override
|
||||
public CredentialEntry createCredentialEntry(String alias, char[] credential)
|
||||
throws IOException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (keyStore.containsAlias(alias) || cache.containsKey(alias)) {
|
||||
throw new IOException("Credential " + alias + " already exists in " + this);
|
||||
}
|
||||
return innerSetCredential(alias, credential);
|
||||
} catch (KeyStoreException e) {
|
||||
throw new IOException("Problem looking up credential " + alias + " in " + this,
|
||||
e);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
return innerSetCredential(alias, credential);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* A simple class for pooling direct ByteBuffers. This is necessary
|
||||
|
@ -40,7 +41,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
* allocated at the same size. There is no attempt to reuse larger
|
||||
* buffers to satisfy smaller allocations.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Evolving
|
||||
public class DirectBufferPool {
|
||||
|
||||
// Essentially implement a multimap with weak values.
|
|
@ -29,14 +29,33 @@ import java.util.Random;
|
|||
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestDFVariations {
|
||||
private static final String TEST_ROOT_DIR =
|
||||
System.getProperty("test.build.data","build/test/data") + "/TestDFVariations";
|
||||
private static File test_root = null;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
test_root = new File(TEST_ROOT_DIR);
|
||||
test_root.mkdirs();
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws IOException {
|
||||
FileUtil.setWritable(test_root, true);
|
||||
FileUtil.fullyDelete(test_root);
|
||||
assertTrue(!test_root.exists());
|
||||
}
|
||||
|
||||
public static class XXDF extends DF {
|
||||
public XXDF() throws IOException {
|
||||
super(new File(System.getProperty("test.build.data","/tmp")), 0L);
|
||||
super(test_root, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
public class TestPathData {
|
||||
private static final String TEST_ROOT_DIR =
|
||||
System.getProperty("test.build.data","build/test/data") + "/testPD";
|
||||
protected Configuration conf;
|
||||
protected FileSystem fs;
|
||||
protected Path testDir;
|
||||
|
@ -43,11 +45,12 @@ public class TestPathData {
|
|||
public void initialize() throws Exception {
|
||||
conf = new Configuration();
|
||||
fs = FileSystem.getLocal(conf);
|
||||
testDir = new Path(
|
||||
System.getProperty("test.build.data", "build/test/data") + "/testPD"
|
||||
);
|
||||
testDir = new Path(TEST_ROOT_DIR);
|
||||
|
||||
// don't want scheme on the path, just an absolute path
|
||||
testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
|
||||
fs.mkdirs(testDir);
|
||||
|
||||
FileSystem.setDefaultUri(conf, fs.getUri());
|
||||
fs.setWorkingDirectory(testDir);
|
||||
fs.mkdirs(new Path("d1"));
|
||||
|
@ -60,6 +63,7 @@ public class TestPathData {
|
|||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
fs.delete(testDir, true);
|
||||
fs.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -773,4 +773,34 @@ public class ViewFileSystemBaseTest {
|
|||
assertFalse(aclStatus.isStickyBit());
|
||||
}
|
||||
|
||||
@Test(expected=AccessControlException.class)
|
||||
public void testInternalSetXAttr() throws IOException {
|
||||
fsView.setXAttr(new Path("/internalDir"), "xattrName", null);
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalGetXAttr() throws IOException {
|
||||
fsView.getXAttr(new Path("/internalDir"), "xattrName");
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalGetXAttrs() throws IOException {
|
||||
fsView.getXAttrs(new Path("/internalDir"));
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalGetXAttrsWithNames() throws IOException {
|
||||
fsView.getXAttrs(new Path("/internalDir"), new ArrayList<String>());
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalListXAttr() throws IOException {
|
||||
fsView.listXAttrs(new Path("/internalDir"));
|
||||
}
|
||||
|
||||
@Test(expected=AccessControlException.class)
|
||||
public void testInternalRemoveXAttr() throws IOException {
|
||||
fsView.removeXAttr(new Path("/internalDir"), "xattrName");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -747,4 +747,34 @@ public class ViewFsBaseTest {
|
|||
AclUtil.getMinimalAcl(PERMISSION_555));
|
||||
assertFalse(aclStatus.isStickyBit());
|
||||
}
|
||||
|
||||
@Test(expected=AccessControlException.class)
|
||||
public void testInternalSetXAttr() throws IOException {
|
||||
fcView.setXAttr(new Path("/internalDir"), "xattrName", null);
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalGetXAttr() throws IOException {
|
||||
fcView.getXAttr(new Path("/internalDir"), "xattrName");
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalGetXAttrs() throws IOException {
|
||||
fcView.getXAttrs(new Path("/internalDir"));
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalGetXAttrsWithNames() throws IOException {
|
||||
fcView.getXAttrs(new Path("/internalDir"), new ArrayList<String>());
|
||||
}
|
||||
|
||||
@Test(expected=NotInMountpointException.class)
|
||||
public void testInternalListXAttr() throws IOException {
|
||||
fcView.listXAttrs(new Path("/internalDir"));
|
||||
}
|
||||
|
||||
@Test(expected=AccessControlException.class)
|
||||
public void testInternalRemoveXAttr() throws IOException {
|
||||
fcView.removeXAttr(new Path("/internalDir"), "xattrName");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
|
@ -29,7 +29,7 @@ import org.junit.Test;
|
|||
import com.google.common.collect.Lists;
|
||||
|
||||
public class TestDirectBufferPool {
|
||||
final DirectBufferPool pool = new DirectBufferPool();
|
||||
final org.apache.hadoop.util.DirectBufferPool pool = new org.apache.hadoop.util.DirectBufferPool();
|
||||
|
||||
@Test
|
||||
public void testBasics() {
|
|
@ -53,7 +53,12 @@ public class NfsExports {
|
|||
long expirationPeriodNano = conf.getLong(
|
||||
Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY,
|
||||
Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT) * 1000 * 1000;
|
||||
try {
|
||||
exports = new NfsExports(cacheSize, expirationPeriodNano, matchHosts);
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Invalid NFS Exports provided: ", e);
|
||||
return exports;
|
||||
}
|
||||
}
|
||||
return exports;
|
||||
}
|
||||
|
|
|
@ -104,6 +104,10 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
|||
|
||||
@Override
|
||||
public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
|
||||
if (hostsMatcher == null) {
|
||||
return MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out, xid,
|
||||
null);
|
||||
}
|
||||
AccessPrivilege accessPrivilege = hostsMatcher.getAccessPrivilege(client);
|
||||
if (accessPrivilege == AccessPrivilege.NONE) {
|
||||
return MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out, xid,
|
||||
|
@ -210,8 +214,15 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
|||
} else if (mntproc == MNTPROC.EXPORT) {
|
||||
// Currently only support one NFS export
|
||||
List<NfsExports> hostsMatchers = new ArrayList<NfsExports>();
|
||||
if (hostsMatcher != null) {
|
||||
hostsMatchers.add(hostsMatcher);
|
||||
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
|
||||
} else {
|
||||
// This means there are no valid exports provided.
|
||||
RpcAcceptedReply.getInstance(xid,
|
||||
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
|
||||
out);
|
||||
}
|
||||
} else {
|
||||
// Invalid procedure
|
||||
RpcAcceptedReply.getInstance(xid,
|
||||
|
|
|
@ -2125,6 +2125,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
}
|
||||
|
||||
// Check export table
|
||||
if (exports == null) {
|
||||
return false;
|
||||
}
|
||||
InetAddress client = ((InetSocketAddress) remoteAddress).getAddress();
|
||||
AccessPrivilege access = exports.getAccessPrivilege(client);
|
||||
if (access == AccessPrivilege.NONE) {
|
||||
|
|
|
@ -357,6 +357,13 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6731. Run "hdfs zkfc-formatZK" on a server in a non-namenode will cause
|
||||
a null pointer exception. (Masatake Iwasaki via brandonli)
|
||||
|
||||
HDFS-6114. Block Scan log rolling will never happen if blocks written
|
||||
continuously leading to huge size of dncp_block_verification.log.curr
|
||||
(vinayakumarb via cmccabe)
|
||||
|
||||
HDFS-6455. NFS: Exception should be added in NFS log for invalid separator in
|
||||
nfs.exports.allowed.hosts. (Abhiraj Butala via brandonli)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -892,6 +899,9 @@ Release 2.5.0 - UNRELEASED
|
|||
HDFS-6703. NFS: Files can be deleted from a read-only mount
|
||||
(Srikanth Upputuri via brandonli)
|
||||
|
||||
HDFS-6422. getfattr in CLI doesn't throw exception or return non-0 return code
|
||||
when xattr doesn't exist. (Charles Lamb via umamahesh)
|
||||
|
||||
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
|||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
|
||||
import org.apache.hadoop.hdfs.util.DirectBufferPool;
|
||||
import org.apache.hadoop.util.DirectBufferPool;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
||||
import org.apache.hadoop.hdfs.util.DirectBufferPool;
|
||||
import org.apache.hadoop.util.DirectBufferPool;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
|
|
@ -1337,6 +1337,6 @@ public interface ClientProtocol {
|
|||
* @param xAttr <code>XAttr</code> to remove
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
@AtMostOnce
|
||||
public void removeXAttr(String src, XAttr xAttr) throws IOException;
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.nio.channels.ReadableByteChannel;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.util.DirectBufferPool;
|
||||
import org.apache.hadoop.util.DirectBufferPool;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
|
|
@ -84,6 +84,10 @@ class BlockPoolSliceScanner {
|
|||
|
||||
private final SortedSet<BlockScanInfo> blockInfoSet
|
||||
= new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
|
||||
|
||||
private final SortedSet<BlockScanInfo> newBlockInfoSet =
|
||||
new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
|
||||
|
||||
private final GSet<Block, BlockScanInfo> blockMap
|
||||
= new LightWeightGSet<Block, BlockScanInfo>(
|
||||
LightWeightGSet.computeCapacity(0.5, "BlockMap"));
|
||||
|
@ -195,7 +199,7 @@ class BlockPoolSliceScanner {
|
|||
BlockScanInfo info = new BlockScanInfo( block );
|
||||
info.lastScanTime = scanTime--;
|
||||
//still keep 'info.lastScanType' to NONE.
|
||||
addBlockInfo(info);
|
||||
addBlockInfo(info, false);
|
||||
}
|
||||
|
||||
RollingLogs rollingLogs = null;
|
||||
|
@ -222,8 +226,22 @@ class BlockPoolSliceScanner {
|
|||
// not really required.
|
||||
}
|
||||
|
||||
private synchronized void addBlockInfo(BlockScanInfo info) {
|
||||
boolean added = blockInfoSet.add(info);
|
||||
/**
|
||||
* Add the BlockScanInfo to sorted set of blockScanInfo
|
||||
* @param info BlockScanInfo to be added
|
||||
* @param isNewBlock true if the block is the new Block, false if
|
||||
* BlockScanInfo is being updated with new scanTime
|
||||
*/
|
||||
private synchronized void addBlockInfo(BlockScanInfo info,
|
||||
boolean isNewBlock) {
|
||||
boolean added = false;
|
||||
if (isNewBlock) {
|
||||
// check whether the block already present
|
||||
boolean exists = blockInfoSet.contains(info);
|
||||
added = !exists && newBlockInfoSet.add(info);
|
||||
} else {
|
||||
added = blockInfoSet.add(info);
|
||||
}
|
||||
blockMap.put(info);
|
||||
|
||||
if (added) {
|
||||
|
@ -233,6 +251,9 @@ class BlockPoolSliceScanner {
|
|||
|
||||
private synchronized void delBlockInfo(BlockScanInfo info) {
|
||||
boolean exists = blockInfoSet.remove(info);
|
||||
if (!exists){
|
||||
exists = newBlockInfoSet.remove(info);
|
||||
}
|
||||
blockMap.remove(info);
|
||||
|
||||
if (exists) {
|
||||
|
@ -249,7 +270,7 @@ class BlockPoolSliceScanner {
|
|||
delBlockInfo(info);
|
||||
info.lastScanTime = e.verificationTime;
|
||||
info.lastScanType = ScanType.VERIFICATION_SCAN;
|
||||
addBlockInfo(info);
|
||||
addBlockInfo(info, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,7 +296,7 @@ class BlockPoolSliceScanner {
|
|||
info = new BlockScanInfo(block.getLocalBlock());
|
||||
info.lastScanTime = getNewBlockScanTime();
|
||||
|
||||
addBlockInfo(info);
|
||||
addBlockInfo(info, true);
|
||||
adjustThrottler();
|
||||
}
|
||||
|
||||
|
@ -319,7 +340,7 @@ class BlockPoolSliceScanner {
|
|||
info.lastScanType = type;
|
||||
info.lastScanTime = now;
|
||||
info.lastScanOk = scanOk;
|
||||
addBlockInfo(info);
|
||||
addBlockInfo(info, false);
|
||||
|
||||
// Don't update meta data if the verification failed.
|
||||
if (!scanOk) {
|
||||
|
@ -578,7 +599,7 @@ class BlockPoolSliceScanner {
|
|||
delBlockInfo(info);
|
||||
info.lastScanTime = lastScanTime;
|
||||
lastScanTime += verifyInterval;
|
||||
addBlockInfo(info);
|
||||
addBlockInfo(info, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -674,12 +695,21 @@ class BlockPoolSliceScanner {
|
|||
throw e;
|
||||
} finally {
|
||||
rollVerificationLogs();
|
||||
rollNewBlocksInfo();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Done scanning block pool: " + blockPoolId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add new blocks to scan in next iteration
|
||||
private synchronized void rollNewBlocksInfo() {
|
||||
for (BlockScanInfo newBlock : newBlockInfoSet) {
|
||||
blockInfoSet.add(newBlock);
|
||||
}
|
||||
newBlockInfoSet.clear();
|
||||
}
|
||||
|
||||
private synchronized void rollVerificationLogs() {
|
||||
if (verificationLog != null) {
|
||||
try {
|
||||
|
|
|
@ -1074,10 +1074,11 @@ public class FSEditLog implements LogsPurgeable {
|
|||
logEdit(op);
|
||||
}
|
||||
|
||||
void logRemoveXAttrs(String src, List<XAttr> xAttrs) {
|
||||
void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
|
||||
final RemoveXAttrOp op = RemoveXAttrOp.getInstance();
|
||||
op.src = src;
|
||||
op.xAttrs = xAttrs;
|
||||
logRpcIds(op, toLogRpcIds);
|
||||
logEdit(op);
|
||||
}
|
||||
|
||||
|
|
|
@ -821,6 +821,10 @@ public class FSEditLogLoader {
|
|||
RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
|
||||
fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src,
|
||||
removeXAttrOp.xAttrs);
|
||||
if (toAddRetryCache) {
|
||||
fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
|
||||
removeXAttrOp.rpcCallId);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
|
|
@ -3551,6 +3551,7 @@ public abstract class FSEditLogOp {
|
|||
XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
|
||||
src = p.getSrc();
|
||||
xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
|
||||
readRpcIds(in, logVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3561,18 +3562,22 @@ public abstract class FSEditLogOp {
|
|||
}
|
||||
b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
|
||||
b.build().writeDelimitedTo(out);
|
||||
// clientId and callId
|
||||
writeRpcIds(rpcClientId, rpcCallId, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
||||
XMLUtils.addSaxString(contentHandler, "SRC", src);
|
||||
appendXAttrsToXml(contentHandler, xAttrs);
|
||||
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
|
||||
}
|
||||
|
||||
@Override
|
||||
void fromXml(Stanza st) throws InvalidXmlException {
|
||||
src = st.getValue("SRC");
|
||||
xAttrs = readXAttrsFromXml(st);
|
||||
readRpcIdsFromXml(st);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8279,11 +8279,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
nnConf.checkXAttrsConfigFlag();
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
boolean getAll = xAttrs == null || xAttrs.isEmpty();
|
||||
List<XAttr> filteredXAttrs = null;
|
||||
if (!getAll) {
|
||||
filteredXAttrs = XAttrPermissionFilter.filterXAttrsForApi(pc, xAttrs);
|
||||
if (filteredXAttrs.isEmpty()) {
|
||||
return filteredXAttrs;
|
||||
try {
|
||||
XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "getXAttrs", src);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
checkOperation(OperationCategory.READ);
|
||||
|
@ -8302,15 +8303,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
if (filteredAll == null || filteredAll.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
List<XAttr> toGet = Lists.newArrayListWithCapacity(filteredXAttrs.size());
|
||||
for (XAttr xAttr : filteredXAttrs) {
|
||||
List<XAttr> toGet = Lists.newArrayListWithCapacity(xAttrs.size());
|
||||
for (XAttr xAttr : xAttrs) {
|
||||
boolean foundIt = false;
|
||||
for (XAttr a : filteredAll) {
|
||||
if (xAttr.getNameSpace() == a.getNameSpace()
|
||||
&& xAttr.getName().equals(a.getName())) {
|
||||
toGet.add(a);
|
||||
foundIt = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!foundIt) {
|
||||
throw new IOException(
|
||||
"At least one of the attributes provided was not found.");
|
||||
}
|
||||
}
|
||||
return toGet;
|
||||
}
|
||||
|
@ -8345,16 +8352,41 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an xattr for a file or directory.
|
||||
*
|
||||
* @param src
|
||||
* - path to remove the xattr from
|
||||
* @param xAttr
|
||||
* - xAttr to remove
|
||||
* @throws AccessControlException
|
||||
* @throws SafeModeException
|
||||
* @throws UnresolvedLinkException
|
||||
* @throws IOException
|
||||
*/
|
||||
void removeXAttr(String src, XAttr xAttr) throws IOException {
|
||||
nnConf.checkXAttrsConfigFlag();
|
||||
HdfsFileStatus resultingStat = null;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
|
||||
removeXAttrInt(src, xAttr, cacheEntry != null);
|
||||
success = true;
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "removeXAttr", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
void removeXAttrInt(String src, XAttr xAttr, boolean logRetryCache)
|
||||
throws IOException {
|
||||
nnConf.checkXAttrsConfigFlag();
|
||||
HdfsFileStatus resultingStat = null;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
writeLock();
|
||||
|
@ -8368,12 +8400,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
xAttrs.add(xAttr);
|
||||
List<XAttr> removedXAttrs = dir.removeXAttrs(src, xAttrs);
|
||||
if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
|
||||
getEditLog().logRemoveXAttrs(src, removedXAttrs);
|
||||
getEditLog().logRemoveXAttrs(src, removedXAttrs, logRetryCache);
|
||||
} else {
|
||||
throw new IOException(
|
||||
"No matching attributes found for remove operation");
|
||||
}
|
||||
resultingStat = getAuditFileInfo(src, false);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "removeXAttr", src);
|
||||
throw e;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.XAttrHelper;
|
|||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* There are four types of extended attributes <XAttr> defined by the
|
||||
|
@ -61,6 +62,18 @@ public class XAttrPermissionFilter {
|
|||
+ XAttrHelper.getPrefixName(xAttr));
|
||||
}
|
||||
|
||||
static void checkPermissionForApi(FSPermissionChecker pc,
|
||||
List<XAttr> xAttrs) throws AccessControlException {
|
||||
Preconditions.checkArgument(xAttrs != null);
|
||||
if (xAttrs.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (XAttr xAttr : xAttrs) {
|
||||
checkPermissionForApi(pc, xAttr);
|
||||
}
|
||||
}
|
||||
|
||||
static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc,
|
||||
List<XAttr> xAttrs) {
|
||||
assert xAttrs != null : "xAttrs can not be null";
|
||||
|
|
|
@ -26,7 +26,7 @@ public class XAttrNameParam extends StringParam {
|
|||
public static final String DEFAULT = "";
|
||||
|
||||
private static Domain DOMAIN = new Domain(NAME,
|
||||
Pattern.compile("^(user\\.|trusted\\.|system\\.|security\\.).+"));
|
||||
Pattern.compile(".*"));
|
||||
|
||||
public XAttrNameParam(final String str) {
|
||||
super(DOMAIN, str == null || str.equals(DEFAULT) ? null : str);
|
||||
|
|
|
@ -2653,6 +2653,75 @@ public class TestDFSShell {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* 1. Test that CLI throws an exception and returns non-0 when user does
|
||||
* not have permission to read an xattr.
|
||||
* 2. Test that CLI throws an exception and returns non-0 when a non-existent
|
||||
* xattr is requested.
|
||||
*/
|
||||
@Test (timeout = 120000)
|
||||
public void testGetFAttrErrors() throws Exception {
|
||||
final UserGroupInformation user = UserGroupInformation.
|
||||
createUserForTesting("user", new String[] {"mygroup"});
|
||||
MiniDFSCluster cluster = null;
|
||||
PrintStream bakErr = null;
|
||||
try {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
final Path p = new Path("/foo");
|
||||
fs.mkdirs(p);
|
||||
bakErr = System.err;
|
||||
|
||||
final FsShell fshell = new FsShell(conf);
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
// No permission for "other".
|
||||
fs.setPermission(p, new FsPermission((short) 0700));
|
||||
|
||||
{
|
||||
final int ret = ToolRunner.run(fshell, new String[] {
|
||||
"-setfattr", "-n", "user.a1", "-v", "1234", "/foo"});
|
||||
assertEquals("Returned should be 0", 0, ret);
|
||||
out.reset();
|
||||
}
|
||||
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
int ret = ToolRunner.run(fshell, new String[] {
|
||||
"-getfattr", "-n", "user.a1", "/foo"});
|
||||
String str = out.toString();
|
||||
assertTrue("xattr value was incorrectly returned",
|
||||
str.indexOf("1234") == -1);
|
||||
out.reset();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
{
|
||||
final int ret = ToolRunner.run(fshell, new String[]{
|
||||
"-getfattr", "-n", "user.nonexistent", "/foo"});
|
||||
String str = out.toString();
|
||||
assertTrue("xattr value was incorrectly returned",
|
||||
str.indexOf(
|
||||
"getfattr: At least one of the attributes provided was not found")
|
||||
>= 0);
|
||||
out.reset();
|
||||
}
|
||||
} finally {
|
||||
if (bakErr != null) {
|
||||
System.setErr(bakErr);
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the server trash configuration is respected when
|
||||
* the client configuration is not set.
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
@ -46,6 +45,7 @@ import static org.apache.hadoop.fs.permission.AclEntryType.USER;
|
|||
import static org.apache.hadoop.fs.permission.FsAction.ALL;
|
||||
import static org.apache.hadoop.fs.permission.FsAction.READ;
|
||||
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import org.junit.After;
|
||||
|
@ -261,11 +261,12 @@ public class FSXAttrBaseTest {
|
|||
fs.setXAttr(path, "user.", value1, EnumSet.of(XAttrSetFlag.CREATE,
|
||||
XAttrSetFlag.REPLACE));
|
||||
Assert.fail("Setting xattr with empty name should fail.");
|
||||
} catch (RemoteException e) {
|
||||
assertEquals("Unexpected RemoteException: " + e, e.getClassName(),
|
||||
HadoopIllegalArgumentException.class.getCanonicalName());
|
||||
GenericTestUtils.assertExceptionContains("XAttr name cannot be empty", e);
|
||||
} catch (HadoopIllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains("XAttr name cannot be empty", e);
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains("Invalid value: \"user.\" does " +
|
||||
"not belong to the domain ^(user\\.|trusted\\.|system\\.|security\\.).+", e);
|
||||
}
|
||||
|
||||
// Set xattr with invalid name: "a1"
|
||||
|
@ -274,11 +275,12 @@ public class FSXAttrBaseTest {
|
|||
XAttrSetFlag.REPLACE));
|
||||
Assert.fail("Setting xattr with invalid name prefix or without " +
|
||||
"name prefix should fail.");
|
||||
} catch (RemoteException e) {
|
||||
assertEquals("Unexpected RemoteException: " + e, e.getClassName(),
|
||||
HadoopIllegalArgumentException.class.getCanonicalName());
|
||||
GenericTestUtils.assertExceptionContains("XAttr name must be prefixed", e);
|
||||
} catch (HadoopIllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains("XAttr name must be prefixed", e);
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains("Invalid value: \"a1\" does " +
|
||||
"not belong to the domain ^(user\\.|trusted\\.|system\\.|security\\.).+", e);
|
||||
}
|
||||
|
||||
// Set xattr without XAttrSetFlag
|
||||
|
@ -341,9 +343,18 @@ public class FSXAttrBaseTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests for getting xattr
|
||||
* 1. To get xattr which does not exist.
|
||||
* 2. To get multiple xattrs.
|
||||
* getxattr tests. Test that getxattr throws an exception if any of
|
||||
* the following are true:
|
||||
* an xattr that was requested doesn't exist
|
||||
* the caller specifies an unknown namespace
|
||||
* the caller doesn't have access to the namespace
|
||||
* the caller doesn't have permission to get the value of the xattr
|
||||
* the caller does not have search access to the parent directory
|
||||
* the caller has only read access to the owning directory
|
||||
* the caller has only search access to the owning directory and
|
||||
* execute/search access to the actual entity
|
||||
* the caller does not have search access to the owning directory and read
|
||||
* access to the actual entity
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testGetXAttrs() throws Exception {
|
||||
|
@ -351,21 +362,159 @@ public class FSXAttrBaseTest {
|
|||
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
||||
fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
|
||||
|
||||
// XAttr does not exist.
|
||||
byte[] value = fs.getXAttr(path, name3);
|
||||
Assert.assertEquals(value, null);
|
||||
/* An XAttr that was requested does not exist. */
|
||||
try {
|
||||
final byte[] value = fs.getXAttr(path, name3);
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"At least one of the attributes provided was not found.", e);
|
||||
}
|
||||
|
||||
List<String> names = Lists.newArrayList();
|
||||
/* Throw an exception if an xattr that was requested does not exist. */
|
||||
{
|
||||
final List<String> names = Lists.newArrayList();
|
||||
names.add(name1);
|
||||
names.add(name2);
|
||||
names.add(name3);
|
||||
Map<String, byte[]> xattrs = fs.getXAttrs(path, names);
|
||||
Assert.assertEquals(xattrs.size(), 2);
|
||||
Assert.assertArrayEquals(value1, xattrs.get(name1));
|
||||
Assert.assertArrayEquals(value2, xattrs.get(name2));
|
||||
try {
|
||||
final Map<String, byte[]> xattrs = fs.getXAttrs(path, names);
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"At least one of the attributes provided was not found.", e);
|
||||
}
|
||||
}
|
||||
|
||||
fs.removeXAttr(path, name1);
|
||||
fs.removeXAttr(path, name2);
|
||||
|
||||
/* Unknown namespace should throw an exception. */
|
||||
try {
|
||||
final byte[] xattr = fs.getXAttr(path, "wackynamespace.foo");
|
||||
Assert.fail("expected IOException");
|
||||
} catch (Exception e) {
|
||||
GenericTestUtils.assertExceptionContains
|
||||
("An XAttr name must be prefixed with user/trusted/security/system, " +
|
||||
"followed by a '.'",
|
||||
e);
|
||||
}
|
||||
|
||||
/*
|
||||
* The 'trusted' namespace should not be accessible and should throw an
|
||||
* exception.
|
||||
*/
|
||||
final UserGroupInformation user = UserGroupInformation.
|
||||
createUserForTesting("user", new String[] {"mygroup"});
|
||||
fs.setXAttr(path, "trusted.foo", "1234".getBytes());
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
final byte[] xattr = userFs.getXAttr(path, "trusted.foo");
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("User doesn't have permission", e);
|
||||
}
|
||||
|
||||
fs.setXAttr(path, name1, "1234".getBytes());
|
||||
|
||||
/*
|
||||
* Test that an exception is thrown if the caller doesn't have permission to
|
||||
* get the value of the xattr.
|
||||
*/
|
||||
|
||||
/* Set access so that only the owner has access. */
|
||||
fs.setPermission(path, new FsPermission((short) 0700));
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
final byte[] xattr = userFs.getXAttr(path, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/*
|
||||
* The caller must have search access to the parent directory.
|
||||
*/
|
||||
final Path childDir = new Path(path, "child" + pathCount);
|
||||
/* Set access to parent so that only the owner has access. */
|
||||
FileSystem.mkdirs(fs, childDir, FsPermission.createImmutable((short)0700));
|
||||
fs.setXAttr(childDir, name1, "1234".getBytes());
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
final byte[] xattr = userFs.getXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/* Check that read access to the owning directory is not good enough. */
|
||||
fs.setPermission(path, new FsPermission((short) 0704));
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
final byte[] xattr = userFs.getXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that search access to the owning directory and search/execute
|
||||
* access to the actual entity with extended attributes is not good enough.
|
||||
*/
|
||||
fs.setPermission(path, new FsPermission((short) 0701));
|
||||
fs.setPermission(childDir, new FsPermission((short) 0701));
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
final byte[] xattr = userFs.getXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that search access to the owning directory and read access to
|
||||
* the actual entity with the extended attribute is good enough.
|
||||
*/
|
||||
fs.setPermission(path, new FsPermission((short) 0701));
|
||||
fs.setPermission(childDir, new FsPermission((short) 0704));
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
final byte[] xattr = userFs.getXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -402,6 +551,166 @@ public class FSXAttrBaseTest {
|
|||
fs.removeXAttr(path, name3);
|
||||
}
|
||||
|
||||
/**
|
||||
* removexattr tests. Test that removexattr throws an exception if any of
|
||||
* the following are true:
|
||||
* an xattr that was requested doesn't exist
|
||||
* the caller specifies an unknown namespace
|
||||
* the caller doesn't have access to the namespace
|
||||
* the caller doesn't have permission to get the value of the xattr
|
||||
* the caller does not have "execute" (scan) access to the parent directory
|
||||
* the caller has only read access to the owning directory
|
||||
* the caller has only execute access to the owning directory and execute
|
||||
* access to the actual entity
|
||||
* the caller does not have execute access to the owning directory and write
|
||||
* access to the actual entity
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testRemoveXAttrPermissions() throws Exception {
|
||||
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
|
||||
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
||||
fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
|
||||
fs.setXAttr(path, name3, null, EnumSet.of(XAttrSetFlag.CREATE));
|
||||
|
||||
try {
|
||||
fs.removeXAttr(path, name2);
|
||||
fs.removeXAttr(path, name2);
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("No matching attributes found", e);
|
||||
}
|
||||
|
||||
/* Unknown namespace should throw an exception. */
|
||||
final String expectedExceptionString = "An XAttr name must be prefixed " +
|
||||
"with user/trusted/security/system, followed by a '.'";
|
||||
try {
|
||||
fs.removeXAttr(path, "wackynamespace.foo");
|
||||
Assert.fail("expected IOException");
|
||||
} catch (RemoteException e) {
|
||||
assertEquals("Unexpected RemoteException: " + e, e.getClassName(),
|
||||
HadoopIllegalArgumentException.class.getCanonicalName());
|
||||
GenericTestUtils.assertExceptionContains(expectedExceptionString, e);
|
||||
} catch (HadoopIllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains(expectedExceptionString, e);
|
||||
}
|
||||
|
||||
/*
|
||||
* The 'trusted' namespace should not be accessible and should throw an
|
||||
* exception.
|
||||
*/
|
||||
final UserGroupInformation user = UserGroupInformation.
|
||||
createUserForTesting("user", new String[] {"mygroup"});
|
||||
fs.setXAttr(path, "trusted.foo", "1234".getBytes());
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
userFs.removeXAttr(path, "trusted.foo");
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("User doesn't have permission", e);
|
||||
} finally {
|
||||
fs.removeXAttr(path, "trusted.foo");
|
||||
}
|
||||
|
||||
/*
|
||||
* Test that an exception is thrown if the caller doesn't have permission to
|
||||
* get the value of the xattr.
|
||||
*/
|
||||
|
||||
/* Set access so that only the owner has access. */
|
||||
fs.setPermission(path, new FsPermission((short) 0700));
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
userFs.removeXAttr(path, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/*
|
||||
* The caller must have "execute" (scan) access to the parent directory.
|
||||
*/
|
||||
final Path childDir = new Path(path, "child" + pathCount);
|
||||
/* Set access to parent so that only the owner has access. */
|
||||
FileSystem.mkdirs(fs, childDir, FsPermission.createImmutable((short)0700));
|
||||
fs.setXAttr(childDir, name1, "1234".getBytes());
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
userFs.removeXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/* Check that read access to the owning directory is not good enough. */
|
||||
fs.setPermission(path, new FsPermission((short) 0704));
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
userFs.removeXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that execute access to the owning directory and scan access to
|
||||
* the actual entity with extended attributes is not good enough.
|
||||
*/
|
||||
fs.setPermission(path, new FsPermission((short) 0701));
|
||||
fs.setPermission(childDir, new FsPermission((short) 0701));
|
||||
try {
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
userFs.removeXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Assert.fail("expected IOException");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Permission denied", e);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that execute access to the owning directory and write access to
|
||||
* the actual entity with extended attributes is good enough.
|
||||
*/
|
||||
fs.setPermission(path, new FsPermission((short) 0701));
|
||||
fs.setPermission(childDir, new FsPermission((short) 0706));
|
||||
user.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final FileSystem userFs = dfsCluster.getFileSystem();
|
||||
userFs.removeXAttr(childDir, name1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testRenameFileWithXAttr() throws Exception {
|
||||
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
|
||||
|
|
|
@ -415,7 +415,7 @@ public class TestNamenodeRetryCache {
|
|||
|
||||
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
|
||||
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
|
||||
assertEquals(22, cacheSet.size());
|
||||
assertEquals(23, cacheSet.size());
|
||||
|
||||
Map<CacheEntry, CacheEntry> oldEntries =
|
||||
new HashMap<CacheEntry, CacheEntry>();
|
||||
|
@ -434,7 +434,7 @@ public class TestNamenodeRetryCache {
|
|||
assertTrue(namesystem.hasRetryCache());
|
||||
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
|
||||
.getRetryCache().getCacheSet();
|
||||
assertEquals(22, cacheSet.size());
|
||||
assertEquals(23, cacheSet.size());
|
||||
iter = cacheSet.iterator();
|
||||
while (iter.hasNext()) {
|
||||
CacheEntry entry = iter.next();
|
||||
|
|
|
@ -160,7 +160,7 @@ public class TestRetryCacheWithHA {
|
|||
FSNamesystem fsn0 = cluster.getNamesystem(0);
|
||||
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
|
||||
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
|
||||
assertEquals(22, cacheSet.size());
|
||||
assertEquals(23, cacheSet.size());
|
||||
|
||||
Map<CacheEntry, CacheEntry> oldEntries =
|
||||
new HashMap<CacheEntry, CacheEntry>();
|
||||
|
@ -181,7 +181,7 @@ public class TestRetryCacheWithHA {
|
|||
FSNamesystem fsn1 = cluster.getNamesystem(1);
|
||||
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
|
||||
.getRetryCache().getCacheSet();
|
||||
assertEquals(22, cacheSet.size());
|
||||
assertEquals(23, cacheSet.size());
|
||||
iter = cacheSet.iterator();
|
||||
while (iter.hasNext()) {
|
||||
CacheEntry entry = iter.next();
|
||||
|
@ -1047,6 +1047,49 @@ public class TestRetryCacheWithHA {
|
|||
}
|
||||
}
|
||||
|
||||
/** removeXAttr */
|
||||
class RemoveXAttrOp extends AtMostOnceOp {
|
||||
private final String src;
|
||||
|
||||
RemoveXAttrOp(DFSClient client, String src) {
|
||||
super("removeXAttr", client);
|
||||
this.src = src;
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepare() throws Exception {
|
||||
Path p = new Path(src);
|
||||
if (!dfs.exists(p)) {
|
||||
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
||||
client.setXAttr(src, "user.key", "value".getBytes(),
|
||||
EnumSet.of(XAttrSetFlag.CREATE));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void invoke() throws Exception {
|
||||
client.removeXAttr(src, "user.key");
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean checkNamenodeBeforeReturn() throws Exception {
|
||||
for (int i = 0; i < CHECKTIMES; i++) {
|
||||
Map<String, byte[]> iter = dfs.getXAttrs(new Path(src));
|
||||
Set<String> keySet = iter.keySet();
|
||||
if (!keySet.contains("user.key")) {
|
||||
return true;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getResult() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testCreateSnapshot() throws Exception {
|
||||
final DFSClient client = genClientWithDummyHandler();
|
||||
|
@ -1183,6 +1226,13 @@ public class TestRetryCacheWithHA {
|
|||
testClientRetryWithFailover(op);
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testRemoveXAttr() throws Exception {
|
||||
DFSClient client = genClientWithDummyHandler();
|
||||
AtMostOnceOp op = new RemoveXAttrOp(client, "/removexattr");
|
||||
testClientRetryWithFailover(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* When NN failover happens, if the client did not receive the response and
|
||||
* send a retry request to the other NN, the same response should be recieved
|
||||
|
|
|
@ -355,12 +355,6 @@ public class TestParam {
|
|||
public void testXAttrNameParam() {
|
||||
final XAttrNameParam p = new XAttrNameParam("user.a1");
|
||||
Assert.assertEquals(p.getXAttrName(), "user.a1");
|
||||
try {
|
||||
new XAttrNameParam("a1");
|
||||
Assert.fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.info("EXPECTED: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Binary file not shown.
|
@ -986,6 +986,8 @@
|
|||
<NAMESPACE>USER</NAMESPACE>
|
||||
<NAME>a2</NAME>
|
||||
</XATTR>
|
||||
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
|
||||
<RPC_CALLID>82</RPC_CALLID>
|
||||
</DATA>
|
||||
</RECORD>
|
||||
<RECORD>
|
||||
|
|
|
@ -62,6 +62,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
|
||||
(Li Lu via jianhe)
|
||||
|
||||
YARN-1342. Recover container tokens upon nodemanager restart. (Jason Lowe via
|
||||
devaraj)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -91,6 +94,12 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2273. NPE in ContinuousScheduling thread when we lose a node.
|
||||
(Wei Yan via kasha)
|
||||
|
||||
YARN-2313. Livelock can occur in FairScheduler when there are lots of
|
||||
running apps (Tsuyoshi Ozawa via Sandy Ryza)
|
||||
|
||||
YARN-2147. client lacks delegation token exception details when
|
||||
application submit fails (Chen He via jlowe)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -297,6 +306,9 @@ Release 2.5.0 - UNRELEASED
|
|||
YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and
|
||||
caused a task timeout for 30mins. (Sunil G via mayank)
|
||||
|
||||
YARN-2300. Improved the documentation of the sample requests for RM REST API -
|
||||
submitting an app. (Varun Vasudev via zjshen)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -194,6 +194,12 @@
|
|||
<Field name="scheduleAsynchronously" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!-- Inconsistent sync warning - updateInterval is only initialized once and never changed -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
|
||||
<Field name="updateInterval" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!-- Inconsistent sync warning - numRetries is only initialized once and never changed -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
|
||||
|
|
|
@ -43,7 +43,7 @@ public class BaseContainerTokenSecretManager extends
|
|||
private static Log LOG = LogFactory
|
||||
.getLog(BaseContainerTokenSecretManager.class);
|
||||
|
||||
private int serialNo = new SecureRandom().nextInt();
|
||||
protected int serialNo = new SecureRandom().nextInt();
|
||||
|
||||
protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
protected final Lock readLock = readWriteLock.readLock();
|
||||
|
|
|
@ -173,8 +173,8 @@ public class NodeManager extends CompositeService
|
|||
NMContainerTokenSecretManager containerTokenSecretManager)
|
||||
throws IOException {
|
||||
if (nmStore.canRecover()) {
|
||||
nmTokenSecretManager.recover(nmStore.loadNMTokenState());
|
||||
// TODO: recover containerTokenSecretManager
|
||||
nmTokenSecretManager.recover();
|
||||
containerTokenSecretManager.recover();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,7 +190,7 @@ public class NodeManager extends CompositeService
|
|||
initAndStartRecoveryStore(conf);
|
||||
|
||||
NMContainerTokenSecretManager containerTokenSecretManager =
|
||||
new NMContainerTokenSecretManager(conf);
|
||||
new NMContainerTokenSecretManager(conf, nmStore);
|
||||
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager =
|
||||
new NMTokenSecretManagerInNM(nmStore);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
|
@ -90,6 +91,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
|
||||
private static final String NM_TOKENS_PREV_MASTER_KEY =
|
||||
NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
|
||||
private static final String CONTAINER_TOKENS_KEY_PREFIX =
|
||||
"ContainerTokens/";
|
||||
private static final String CONTAINER_TOKENS_CURRENT_MASTER_KEY =
|
||||
CONTAINER_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
|
||||
private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
|
||||
CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
|
||||
|
||||
private DB db;
|
||||
|
||||
|
@ -141,7 +148,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
key.substring(0, userEndPos+1)));
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
|
@ -260,7 +267,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), proto.toByteArray());
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,7 +290,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,7 +313,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -355,7 +362,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
|
@ -371,7 +378,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), taskProto.toByteArray());
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -381,14 +388,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RecoveredNMTokenState loadNMTokenState() throws IOException {
|
||||
RecoveredNMTokenState state = new RecoveredNMTokenState();
|
||||
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||
RecoveredNMTokensState state = new RecoveredNMTokensState();
|
||||
state.applicationMasterKeys =
|
||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||
LeveldbIterator iter = null;
|
||||
|
@ -420,7 +427,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
|
@ -454,7 +461,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -468,7 +475,91 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(dbKey), pb.getProto().toByteArray());
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RecoveredContainerTokensState loadContainerTokensState()
|
||||
throws IOException {
|
||||
RecoveredContainerTokensState state = new RecoveredContainerTokensState();
|
||||
state.activeTokens = new HashMap<ContainerId, Long>();
|
||||
LeveldbIterator iter = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
|
||||
final int containerTokensKeyPrefixLength =
|
||||
CONTAINER_TOKENS_KEY_PREFIX.length();
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = iter.next();
|
||||
String fullKey = asString(entry.getKey());
|
||||
if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
|
||||
break;
|
||||
}
|
||||
String key = fullKey.substring(containerTokensKeyPrefixLength);
|
||||
if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
|
||||
state.currentMasterKey = parseMasterKey(entry.getValue());
|
||||
} else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
|
||||
state.previousMasterKey = parseMasterKey(entry.getValue());
|
||||
} else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
|
||||
loadContainerToken(state, fullKey, key, entry.getValue());
|
||||
}
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
private static void loadContainerToken(RecoveredContainerTokensState state,
|
||||
String key, String containerIdStr, byte[] value) throws IOException {
|
||||
ContainerId containerId;
|
||||
Long expTime;
|
||||
try {
|
||||
containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
expTime = Long.parseLong(asString(value));
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IOException("Bad container token state for " + key, e);
|
||||
}
|
||||
state.activeTokens.put(containerId, expTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||
throws IOException {
|
||||
storeMasterKey(CONTAINER_TOKENS_CURRENT_MASTER_KEY, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||
throws IOException {
|
||||
storeMasterKey(CONTAINER_TOKENS_PREV_MASTER_KEY, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerToken(ContainerId containerId, Long expTime)
|
||||
throws IOException {
|
||||
String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
|
||||
try {
|
||||
db.put(bytes(key), bytes(expTime.toString()));
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeContainerToken(ContainerId containerId)
|
||||
throws IOException {
|
||||
String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
|
||||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -554,7 +645,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
try {
|
||||
db.put(bytes(key), data);
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||
|
@ -80,7 +81,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RecoveredNMTokenState loadNMTokenState() throws IOException {
|
||||
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||
throw new UnsupportedOperationException(
|
||||
"Recovery not supported by this state store");
|
||||
}
|
||||
|
@ -105,6 +106,33 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
|||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredContainerTokensState loadContainerTokensState()
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException(
|
||||
"Recovery not supported by this state store");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerToken(ContainerId containerId,
|
||||
Long expirationTime) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeContainerToken(ContainerId containerId)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initStorage(Configuration conf) throws IOException {
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||
|
@ -102,7 +103,7 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
public static class RecoveredNMTokenState {
|
||||
public static class RecoveredNMTokensState {
|
||||
MasterKey currentMasterKey;
|
||||
MasterKey previousMasterKey;
|
||||
Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
|
||||
|
@ -120,6 +121,24 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
public static class RecoveredContainerTokensState {
|
||||
MasterKey currentMasterKey;
|
||||
MasterKey previousMasterKey;
|
||||
Map<ContainerId, Long> activeTokens;
|
||||
|
||||
public MasterKey getCurrentMasterKey() {
|
||||
return currentMasterKey;
|
||||
}
|
||||
|
||||
public MasterKey getPreviousMasterKey() {
|
||||
return previousMasterKey;
|
||||
}
|
||||
|
||||
public Map<ContainerId, Long> getActiveTokens() {
|
||||
return activeTokens;
|
||||
}
|
||||
}
|
||||
|
||||
/** Initialize the state storage */
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws IOException {
|
||||
|
@ -193,7 +212,8 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
public abstract void removeDeletionTask(int taskId) throws IOException;
|
||||
|
||||
|
||||
public abstract RecoveredNMTokenState loadNMTokenState() throws IOException;
|
||||
public abstract RecoveredNMTokensState loadNMTokensState()
|
||||
throws IOException;
|
||||
|
||||
public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
|
||||
throws IOException;
|
||||
|
@ -208,6 +228,22 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
ApplicationAttemptId attempt) throws IOException;
|
||||
|
||||
|
||||
public abstract RecoveredContainerTokensState loadContainerTokensState()
|
||||
throws IOException;
|
||||
|
||||
public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||
throws IOException;
|
||||
|
||||
public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||
throws IOException;
|
||||
|
||||
public abstract void storeContainerToken(ContainerId containerId,
|
||||
Long expirationTime) throws IOException;
|
||||
|
||||
public abstract void removeContainerToken(ContainerId containerId)
|
||||
throws IOException;
|
||||
|
||||
|
||||
protected abstract void initStorage(Configuration conf) throws IOException;
|
||||
|
||||
protected abstract void startStorage() throws IOException;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.security;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -33,6 +34,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
|
@ -49,14 +53,74 @@ public class NMContainerTokenSecretManager extends
|
|||
|
||||
private MasterKeyData previousMasterKey;
|
||||
private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
|
||||
|
||||
private final NMStateStoreService stateStore;
|
||||
|
||||
private String nodeHostAddr;
|
||||
|
||||
public NMContainerTokenSecretManager(Configuration conf) {
|
||||
this(conf, new NMNullStateStoreService());
|
||||
}
|
||||
|
||||
public NMContainerTokenSecretManager(Configuration conf,
|
||||
NMStateStoreService stateStore) {
|
||||
super(conf);
|
||||
recentlyStartedContainerTracker =
|
||||
new TreeMap<Long, List<ContainerId>>();
|
||||
this.stateStore = stateStore;
|
||||
}
|
||||
|
||||
public synchronized void recover()
|
||||
throws IOException {
|
||||
RecoveredContainerTokensState state =
|
||||
stateStore.loadContainerTokensState();
|
||||
MasterKey key = state.getCurrentMasterKey();
|
||||
if (key != null) {
|
||||
super.currentMasterKey =
|
||||
new MasterKeyData(key, createSecretKey(key.getBytes().array()));
|
||||
}
|
||||
|
||||
key = state.getPreviousMasterKey();
|
||||
if (key != null) {
|
||||
previousMasterKey =
|
||||
new MasterKeyData(key, createSecretKey(key.getBytes().array()));
|
||||
}
|
||||
|
||||
// restore the serial number from the current master key
|
||||
if (super.currentMasterKey != null) {
|
||||
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
|
||||
}
|
||||
|
||||
for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
|
||||
ContainerId containerId = entry.getKey();
|
||||
Long expTime = entry.getValue();
|
||||
List<ContainerId> containerList =
|
||||
recentlyStartedContainerTracker.get(expTime);
|
||||
if (containerList == null) {
|
||||
containerList = new ArrayList<ContainerId>();
|
||||
recentlyStartedContainerTracker.put(expTime, containerList);
|
||||
}
|
||||
if (!containerList.contains(containerId)) {
|
||||
containerList.add(containerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateCurrentMasterKey(MasterKeyData key) {
|
||||
super.currentMasterKey = key;
|
||||
try {
|
||||
stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to update current master key in state store", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void updatePreviousMasterKey(MasterKeyData key) {
|
||||
previousMasterKey = key;
|
||||
try {
|
||||
stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to update previous master key in state store", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -68,21 +132,16 @@ public class NMContainerTokenSecretManager extends
|
|||
*/
|
||||
@Private
|
||||
public synchronized void setMasterKey(MasterKey masterKeyRecord) {
|
||||
// Update keys only if the key has changed.
|
||||
if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
|
||||
.getKeyId() != masterKeyRecord.getKeyId()) {
|
||||
LOG.info("Rolling master-key for container-tokens, got key with id "
|
||||
+ masterKeyRecord.getKeyId());
|
||||
if (super.currentMasterKey == null) {
|
||||
super.currentMasterKey =
|
||||
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
|
||||
.getBytes().array()));
|
||||
} else {
|
||||
if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
|
||||
.getKeyId()) {
|
||||
// Update keys only if the key has changed.
|
||||
this.previousMasterKey = super.currentMasterKey;
|
||||
super.currentMasterKey =
|
||||
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
|
||||
.getBytes().array()));
|
||||
if (super.currentMasterKey != null) {
|
||||
updatePreviousMasterKey(super.currentMasterKey);
|
||||
}
|
||||
updateCurrentMasterKey(new MasterKeyData(masterKeyRecord,
|
||||
createSecretKey(masterKeyRecord.getBytes().array())));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,14 +196,19 @@ public class NMContainerTokenSecretManager extends
|
|||
|
||||
removeAnyContainerTokenIfExpired();
|
||||
|
||||
ContainerId containerId = tokenId.getContainerID();
|
||||
Long expTime = tokenId.getExpiryTimeStamp();
|
||||
// We might have multiple containers with same expiration time.
|
||||
if (!recentlyStartedContainerTracker.containsKey(expTime)) {
|
||||
recentlyStartedContainerTracker
|
||||
.put(expTime, new ArrayList<ContainerId>());
|
||||
}
|
||||
recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
|
||||
|
||||
recentlyStartedContainerTracker.get(expTime).add(containerId);
|
||||
try {
|
||||
stateStore.storeContainerToken(containerId, expTime);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to store token for container " + containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized void removeAnyContainerTokenIfExpired() {
|
||||
|
@ -155,6 +219,13 @@ public class NMContainerTokenSecretManager extends
|
|||
while (containersI.hasNext()) {
|
||||
Entry<Long, List<ContainerId>> containerEntry = containersI.next();
|
||||
if (containerEntry.getKey() < currTime) {
|
||||
for (ContainerId container : containerEntry.getValue()) {
|
||||
try {
|
||||
stateStore.removeContainerToken(container);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to remove token for container " + container, e);
|
||||
}
|
||||
}
|
||||
containersI.remove();
|
||||
} else {
|
||||
break;
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
|||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
|
||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
|
||||
|
@ -64,8 +64,9 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
|
|||
this.stateStore = stateStore;
|
||||
}
|
||||
|
||||
public synchronized void recover(RecoveredNMTokenState state)
|
||||
public synchronized void recover()
|
||||
throws IOException {
|
||||
RecoveredNMTokensState state = stateStore.loadNMTokensState();
|
||||
MasterKey key = state.getCurrentMasterKey();
|
||||
if (key != null) {
|
||||
super.currentMasterKey =
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||
|
@ -36,7 +37,8 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
|||
public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||
private Map<TrackerKey, TrackerState> trackerStates;
|
||||
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
|
||||
private RecoveredNMTokenState nmTokenState;
|
||||
private RecoveredNMTokensState nmTokenState;
|
||||
private RecoveredContainerTokensState containerTokenState;
|
||||
|
||||
public NMMemoryStateStoreService() {
|
||||
super(NMMemoryStateStoreService.class.getName());
|
||||
|
@ -117,12 +119,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
|
||||
@Override
|
||||
protected void initStorage(Configuration conf) {
|
||||
nmTokenState = new RecoveredNMTokenState();
|
||||
nmTokenState = new RecoveredNMTokensState();
|
||||
nmTokenState.applicationMasterKeys =
|
||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||
containerTokenState = new RecoveredContainerTokensState();
|
||||
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
|
||||
trackerStates = new HashMap<TrackerKey, TrackerState>();
|
||||
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -157,9 +160,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
|
||||
|
||||
@Override
|
||||
public RecoveredNMTokenState loadNMTokenState() throws IOException {
|
||||
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||
// return a copy so caller can't modify our state
|
||||
RecoveredNMTokenState result = new RecoveredNMTokenState();
|
||||
RecoveredNMTokensState result = new RecoveredNMTokensState();
|
||||
result.currentMasterKey = nmTokenState.currentMasterKey;
|
||||
result.previousMasterKey = nmTokenState.previousMasterKey;
|
||||
result.applicationMasterKeys =
|
||||
|
@ -197,6 +200,48 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RecoveredContainerTokensState loadContainerTokensState()
|
||||
throws IOException {
|
||||
// return a copy so caller can't modify our state
|
||||
RecoveredContainerTokensState result =
|
||||
new RecoveredContainerTokensState();
|
||||
result.currentMasterKey = containerTokenState.currentMasterKey;
|
||||
result.previousMasterKey = containerTokenState.previousMasterKey;
|
||||
result.activeTokens =
|
||||
new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||
throws IOException {
|
||||
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
|
||||
containerTokenState.currentMasterKey =
|
||||
new MasterKeyPBImpl(keypb.getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||
throws IOException {
|
||||
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
|
||||
containerTokenState.previousMasterKey =
|
||||
new MasterKeyPBImpl(keypb.getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerToken(ContainerId containerId,
|
||||
Long expirationTime) throws IOException {
|
||||
containerTokenState.activeTokens.put(containerId, expirationTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeContainerToken(ContainerId containerId)
|
||||
throws IOException {
|
||||
containerTokenState.activeTokens.remove(containerId);
|
||||
}
|
||||
|
||||
|
||||
private static class TrackerState {
|
||||
Map<Path, LocalResourceProto> inProgressMap =
|
||||
new HashMap<Path, LocalResourceProto>();
|
||||
|
|
|
@ -27,11 +27,13 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.service.ServiceStateException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
|
@ -42,12 +44,15 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion
|
|||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -502,7 +507,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
@Test
|
||||
public void testNMTokenStorage() throws IOException {
|
||||
// test empty when no state
|
||||
RecoveredNMTokenState state = stateStore.loadNMTokenState();
|
||||
RecoveredNMTokensState state = stateStore.loadNMTokensState();
|
||||
assertNull(state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||
|
@ -512,7 +517,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
MasterKey currentKey = secretMgr.generateKey();
|
||||
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokenState();
|
||||
state = stateStore.loadNMTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||
|
@ -521,7 +526,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
MasterKey prevKey = secretMgr.generateKey();
|
||||
stateStore.storeNMTokenPreviousMasterKey(prevKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokenState();
|
||||
state = stateStore.loadNMTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||
|
@ -536,7 +541,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
MasterKey attemptKey2 = secretMgr.generateKey();
|
||||
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokenState();
|
||||
state = stateStore.loadNMTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
|
||||
|
@ -558,7 +563,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
currentKey = secretMgr.generateKey();
|
||||
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokenState();
|
||||
state = stateStore.loadNMTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
loadedAppKeys = state.getApplicationMasterKeys();
|
||||
|
@ -568,10 +573,89 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerTokenStorage() throws IOException {
|
||||
// test empty when no state
|
||||
RecoveredContainerTokensState state =
|
||||
stateStore.loadContainerTokensState();
|
||||
assertNull(state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getActiveTokens().isEmpty());
|
||||
|
||||
// store a master key and verify recovered
|
||||
ContainerTokenKeyGeneratorForTest keygen =
|
||||
new ContainerTokenKeyGeneratorForTest(new YarnConfiguration());
|
||||
MasterKey currentKey = keygen.generateKey();
|
||||
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getActiveTokens().isEmpty());
|
||||
|
||||
// store a previous key and verify recovered
|
||||
MasterKey prevKey = keygen.generateKey();
|
||||
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
assertTrue(state.getActiveTokens().isEmpty());
|
||||
|
||||
// store a few container tokens and verify recovered
|
||||
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||
Long expTime1 = 1234567890L;
|
||||
ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
|
||||
Long expTime2 = 9876543210L;
|
||||
stateStore.storeContainerToken(cid1, expTime1);
|
||||
stateStore.storeContainerToken(cid2, expTime2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
Map<ContainerId, Long> loadedActiveTokens =
|
||||
state.getActiveTokens();
|
||||
assertEquals(2, loadedActiveTokens.size());
|
||||
assertEquals(expTime1, loadedActiveTokens.get(cid1));
|
||||
assertEquals(expTime2, loadedActiveTokens.get(cid2));
|
||||
|
||||
// add/update/remove tokens and verify recovered
|
||||
ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3);
|
||||
Long expTime3 = 135798642L;
|
||||
stateStore.storeContainerToken(cid3, expTime3);
|
||||
stateStore.removeContainerToken(cid1);
|
||||
expTime2 += 246897531L;
|
||||
stateStore.storeContainerToken(cid2, expTime2);
|
||||
prevKey = currentKey;
|
||||
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
|
||||
currentKey = keygen.generateKey();
|
||||
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
loadedActiveTokens = state.getActiveTokens();
|
||||
assertEquals(2, loadedActiveTokens.size());
|
||||
assertNull(loadedActiveTokens.get(cid1));
|
||||
assertEquals(expTime2, loadedActiveTokens.get(cid2));
|
||||
assertEquals(expTime3, loadedActiveTokens.get(cid3));
|
||||
}
|
||||
|
||||
private static class NMTokenSecretManagerForTest extends
|
||||
BaseNMTokenSecretManager {
|
||||
public MasterKey generateKey() {
|
||||
return createNewMasterKey().getMasterKey();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ContainerTokenKeyGeneratorForTest extends
|
||||
BaseContainerTokenSecretManager {
|
||||
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public MasterKey generateKey() {
|
||||
return createNewMasterKey().getMasterKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.security;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestNMContainerTokenSecretManager {
|
||||
|
||||
@Test
|
||||
public void testRecovery() throws IOException {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||
final NodeId nodeId = NodeId.newInstance("somehost", 1234);
|
||||
final ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||
final ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
|
||||
ContainerTokenKeyGeneratorForTest keygen =
|
||||
new ContainerTokenKeyGeneratorForTest(conf);
|
||||
NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||
stateStore.init(conf);
|
||||
stateStore.start();
|
||||
NMContainerTokenSecretManager secretMgr =
|
||||
new NMContainerTokenSecretManager(conf, stateStore);
|
||||
secretMgr.setNodeId(nodeId);
|
||||
MasterKey currentKey = keygen.generateKey();
|
||||
secretMgr.setMasterKey(currentKey);
|
||||
ContainerTokenIdentifier tokenId1 =
|
||||
createContainerTokenId(cid1, nodeId, "user1", secretMgr);
|
||||
ContainerTokenIdentifier tokenId2 =
|
||||
createContainerTokenId(cid2, nodeId, "user2", secretMgr);
|
||||
assertNotNull(secretMgr.retrievePassword(tokenId1));
|
||||
assertNotNull(secretMgr.retrievePassword(tokenId2));
|
||||
|
||||
// restart and verify tokens still valid
|
||||
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
|
||||
secretMgr.setNodeId(nodeId);
|
||||
secretMgr.recover();
|
||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
|
||||
assertTrue(secretMgr.isValidStartContainerRequest(tokenId2));
|
||||
assertNotNull(secretMgr.retrievePassword(tokenId1));
|
||||
assertNotNull(secretMgr.retrievePassword(tokenId2));
|
||||
|
||||
// roll master key and start a container
|
||||
secretMgr.startContainerSuccessful(tokenId2);
|
||||
currentKey = keygen.generateKey();
|
||||
secretMgr.setMasterKey(currentKey);
|
||||
|
||||
// restart and verify tokens still valid due to prev key persist
|
||||
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
|
||||
secretMgr.setNodeId(nodeId);
|
||||
secretMgr.recover();
|
||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
|
||||
assertFalse(secretMgr.isValidStartContainerRequest(tokenId2));
|
||||
assertNotNull(secretMgr.retrievePassword(tokenId1));
|
||||
assertNotNull(secretMgr.retrievePassword(tokenId2));
|
||||
|
||||
// roll master key again, restart, and verify keys no longer valid
|
||||
currentKey = keygen.generateKey();
|
||||
secretMgr.setMasterKey(currentKey);
|
||||
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
|
||||
secretMgr.setNodeId(nodeId);
|
||||
secretMgr.recover();
|
||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
|
||||
assertFalse(secretMgr.isValidStartContainerRequest(tokenId2));
|
||||
try {
|
||||
secretMgr.retrievePassword(tokenId1);
|
||||
fail("token should not be valid");
|
||||
} catch (InvalidToken e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
secretMgr.retrievePassword(tokenId2);
|
||||
fail("token should not be valid");
|
||||
} catch (InvalidToken e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
stateStore.close();
|
||||
}
|
||||
|
||||
private static ContainerTokenIdentifier createContainerTokenId(
|
||||
ContainerId cid, NodeId nodeId, String user,
|
||||
NMContainerTokenSecretManager secretMgr) throws IOException {
|
||||
long rmid = cid.getApplicationAttemptId().getApplicationId()
|
||||
.getClusterTimestamp();
|
||||
ContainerTokenIdentifier ctid = new ContainerTokenIdentifier(cid,
|
||||
nodeId.toString(), user, BuilderUtils.newResource(1024, 1),
|
||||
System.currentTimeMillis() + 100000L,
|
||||
secretMgr.getCurrentKey().getKeyId(), rmid,
|
||||
Priority.newInstance(0), 0);
|
||||
Token token = BuilderUtils.newContainerToken(nodeId,
|
||||
secretMgr.createPassword(ctid), ctid);
|
||||
return BuilderUtils.newContainerTokenIdentifier(token);
|
||||
}
|
||||
|
||||
private static class ContainerTokenKeyGeneratorForTest extends
|
||||
BaseContainerTokenSecretManager {
|
||||
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public MasterKey generateKey() {
|
||||
return createNewMasterKey().getMasterKey();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -73,7 +73,7 @@ public class TestNMTokenSecretManagerInNM {
|
|||
|
||||
// restart and verify key is still there and token still valid
|
||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||
secretMgr.recover(stateStore.loadNMTokenState());
|
||||
secretMgr.recover();
|
||||
secretMgr.setNodeId(nodeId);
|
||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||
assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||
|
@ -88,7 +88,7 @@ public class TestNMTokenSecretManagerInNM {
|
|||
|
||||
// restart and verify attempt1 key is still valid due to prev key persist
|
||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||
secretMgr.recover(stateStore.loadNMTokenState());
|
||||
secretMgr.recover();
|
||||
secretMgr.setNodeId(nodeId);
|
||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||
|
@ -101,7 +101,7 @@ public class TestNMTokenSecretManagerInNM {
|
|||
currentKey = keygen.generateKey();
|
||||
secretMgr.setMasterKey(currentKey);
|
||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||
secretMgr.recover(stateStore.loadNMTokenState());
|
||||
secretMgr.recover();
|
||||
secretMgr.setNodeId(nodeId);
|
||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||
|
@ -117,7 +117,7 @@ public class TestNMTokenSecretManagerInNM {
|
|||
// remove last attempt, restart, verify both tokens are now bad
|
||||
secretMgr.appFinished(attempt2.getApplicationId());
|
||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||
secretMgr.recover(stateStore.loadNMTokenState());
|
||||
secretMgr.recover();
|
||||
secretMgr.setNodeId(nodeId);
|
||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||
|
|
|
@ -135,7 +135,7 @@ public class FairScheduler extends
|
|||
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
|
||||
|
||||
// How often fair shares are re-calculated (ms)
|
||||
protected long UPDATE_INTERVAL = 500;
|
||||
protected long updateInterval;
|
||||
private final int UPDATE_DEBUG_FREQUENCY = 5;
|
||||
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||
|
||||
|
@ -244,13 +244,13 @@ public class FairScheduler extends
|
|||
|
||||
/**
|
||||
* A runnable which calls {@link FairScheduler#update()} every
|
||||
* <code>UPDATE_INTERVAL</code> milliseconds.
|
||||
* <code>updateInterval</code> milliseconds.
|
||||
*/
|
||||
private class UpdateThread implements Runnable {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(UPDATE_INTERVAL);
|
||||
Thread.sleep(updateInterval);
|
||||
update();
|
||||
preemptTasksIfNecessary();
|
||||
} catch (Exception e) {
|
||||
|
@ -1206,6 +1206,15 @@ public class FairScheduler extends
|
|||
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
||||
usePortForNodeName = this.conf.getUsePortForNodeName();
|
||||
|
||||
updateInterval = this.conf.getUpdateInterval();
|
||||
if (updateInterval < 0) {
|
||||
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
|
||||
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
|
||||
+ " is invalid, so using default value " +
|
||||
+ FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
|
||||
+ " ms instead");
|
||||
}
|
||||
|
||||
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
||||
// This stores per-application scheduling information
|
||||
this.applications =
|
||||
|
|
|
@ -123,6 +123,11 @@ public class FairSchedulerConfiguration extends Configuration {
|
|||
protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
|
||||
protected static final int DEFAULT_MAX_ASSIGN = -1;
|
||||
|
||||
/** The update interval for calculating resources in FairScheduler .*/
|
||||
public static final String UPDATE_INTERVAL_MS =
|
||||
CONF_PREFIX + "update-interval-ms";
|
||||
public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
|
||||
|
||||
public FairSchedulerConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
@ -247,6 +252,10 @@ public class FairSchedulerConfiguration extends Configuration {
|
|||
}
|
||||
}
|
||||
|
||||
public long getUpdateInterval() {
|
||||
return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
|
||||
}
|
||||
|
||||
private static int findResource(String val, String units)
|
||||
throws AllocationConfigurationException {
|
||||
Pattern pattern = Pattern.compile("(\\d+) ?" + units);
|
||||
|
|
|
@ -388,7 +388,11 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
// If user provides incorrect token then it should not be added for
|
||||
// renewal.
|
||||
for (DelegationTokenToRenew dtr : tokenList) {
|
||||
try {
|
||||
renewToken(dtr);
|
||||
} catch (IOException ioe) {
|
||||
throw new IOException("Failed to renew token: " + dtr.token, ioe);
|
||||
}
|
||||
}
|
||||
for (DelegationTokenToRenew dtr : tokenList) {
|
||||
addTokenToList(dtr);
|
||||
|
|
|
@ -94,7 +94,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
|||
scheduler = (FairScheduler)resourceManager.getResourceScheduler();
|
||||
|
||||
scheduler.setClock(clock);
|
||||
scheduler.UPDATE_INTERVAL = 60 * 1000;
|
||||
scheduler.updateInterval = 60 * 1000;
|
||||
}
|
||||
|
||||
private void registerNodeAndSubmitApp(
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
|
|||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -674,6 +675,39 @@ public class TestDelegationTokenRenewer {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testDTRonAppSubmission()
|
||||
throws IOException, InterruptedException, BrokenBarrierException {
|
||||
final Credentials credsx = new Credentials();
|
||||
final Token<?> tokenx = mock(Token.class);
|
||||
credsx.addToken(new Text("token"), tokenx);
|
||||
doReturn(true).when(tokenx).isManaged();
|
||||
doThrow(new IOException("boom"))
|
||||
.when(tokenx).renew(any(Configuration.class));
|
||||
// fire up the renewer
|
||||
final DelegationTokenRenewer dtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||
dtr.setRMContext(mockContext);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
||||
dtr.init(conf);
|
||||
dtr.start();
|
||||
|
||||
try {
|
||||
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
|
||||
fail("Catch IOException on app submission");
|
||||
} catch (IOException e){
|
||||
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
|
||||
Assert.assertTrue(e.getCause().toString().contains("boom"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testConcurrentAddApplication()
|
||||
throws IOException, InterruptedException, BrokenBarrierException {
|
||||
|
|
|
@ -205,6 +205,12 @@ Properties that can be placed in yarn-site.xml
|
|||
instead. Defaults to true. If a queue placement policy is given in the
|
||||
allocations file, this property is ignored.
|
||||
|
||||
* <<<yarn.scheduler.fair.update-interval-ms>>>
|
||||
|
||||
* The interval at which to lock the scheduler and recalculate fair shares,
|
||||
recalculate demand, and check whether anything is due for preemption.
|
||||
Defaults to 500 ms.
|
||||
|
||||
Allocation file format
|
||||
|
||||
The allocation file must be in XML format. The format contains five types of
|
||||
|
|
|
@ -2228,68 +2228,48 @@ _01_000001</amContainerLogs>
|
|||
{
|
||||
"application-id":"application_1404203615263_0001",
|
||||
"application-name":"test",
|
||||
"queue":"testqueue",
|
||||
"priority":"3",
|
||||
"am-container-spec":
|
||||
{
|
||||
"local-resources":
|
||||
{
|
||||
"entry":
|
||||
[
|
||||
{
|
||||
"key":"example",
|
||||
"key":"AppMaster.jar",
|
||||
"value":
|
||||
{
|
||||
"resource":"http://www.test.com/file.txt",
|
||||
"resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
|
||||
"type":"FILE",
|
||||
"visibility":"APPLICATION",
|
||||
"size":"100",
|
||||
"timestamp":"1404203616003"
|
||||
"size": "43004",
|
||||
"timestamp": "1405452071209"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"commands":
|
||||
{
|
||||
"command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
|
||||
},
|
||||
"environment":
|
||||
{
|
||||
"entry":
|
||||
{
|
||||
"key":"APP_VAR",
|
||||
"value":"ENV_SETTING"
|
||||
}
|
||||
},
|
||||
"commands":
|
||||
{
|
||||
"command":"/bin/sleep 5"
|
||||
},
|
||||
"service-data":
|
||||
{
|
||||
"entry":
|
||||
{
|
||||
"key":"test",
|
||||
"value":"dmFsdWUxMg"
|
||||
}
|
||||
},
|
||||
"credentials":
|
||||
{
|
||||
"tokens":null,
|
||||
"secrets":
|
||||
{
|
||||
"entry":
|
||||
{
|
||||
"key":"secret1",
|
||||
"value":"c2VjcmV0MQ"
|
||||
}
|
||||
}
|
||||
},
|
||||
"application-acls":
|
||||
{
|
||||
"entry":
|
||||
[
|
||||
{
|
||||
"key":"VIEW_APP",
|
||||
"value":"testuser3, testuser4"
|
||||
"key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
|
||||
"value": "1405459400754"
|
||||
},
|
||||
{
|
||||
"key":"MODIFY_APP",
|
||||
"value":"testuser1, testuser2"
|
||||
"key": "CLASSPATH",
|
||||
"value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
|
||||
},
|
||||
{
|
||||
"key": "DISTRIBUTEDSHELLSCRIPTLEN",
|
||||
"value": "6"
|
||||
},
|
||||
{
|
||||
"key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
|
||||
"value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -2302,16 +2282,9 @@ _01_000001</amContainerLogs>
|
|||
"vCores":"1"
|
||||
},
|
||||
"application-type":"YARN",
|
||||
"keep-containers-across-application-attempts":"false",
|
||||
"application-tags":
|
||||
{
|
||||
"tag":
|
||||
[
|
||||
"tag 2",
|
||||
"tag1"
|
||||
]
|
||||
}
|
||||
"keep-containers-across-application-attempts":"false"
|
||||
}
|
||||
|
||||
+---+
|
||||
|
||||
Response Header:
|
||||
|
@ -2349,22 +2322,34 @@ _01_000001</amContainerLogs>
|
|||
<entry>
|
||||
<key>example</key>
|
||||
<value>
|
||||
<resource>http://www.test.com/file.txt</resource>
|
||||
<resource>hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar</resource>
|
||||
<type>FILE</type>
|
||||
<visibility>APPLICATION</visibility>
|
||||
<size>100</size>
|
||||
<timestamp>1404204892877</timestamp>
|
||||
<size>43004</size>
|
||||
<timestamp>1405452071209</timestamp>
|
||||
</value>
|
||||
</entry>
|
||||
</local-resources>
|
||||
<environment>
|
||||
<entry>
|
||||
<key>APP_VAR</key>
|
||||
<value>ENV_SETTING</value>
|
||||
<key>DISTRIBUTEDSHELLSCRIPTTIMESTAMP</key>
|
||||
<value>1405459400754</value>
|
||||
</entry>
|
||||
<entry>
|
||||
<key>CLASSPATH</key>
|
||||
<value>{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties</value>
|
||||
</entry>
|
||||
<entry>
|
||||
<key>DISTRIBUTEDSHELLSCRIPTLEN</key>
|
||||
<value>6</value>
|
||||
</entry>
|
||||
<entry>
|
||||
<key>DISTRIBUTEDSHELLSCRIPTLOCATION</key>
|
||||
<value>hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands</value>
|
||||
</entry>
|
||||
</environment>
|
||||
<commands>
|
||||
<command>/bin/sleep 5</command>
|
||||
<command>{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr</command>
|
||||
</commands>
|
||||
<service-data>
|
||||
<entry>
|
||||
|
|
Loading…
Reference in New Issue