Merge from trunk to branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1613334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-07-24 23:55:15 +00:00
commit e670641bbf
60 changed files with 1835 additions and 232 deletions

View File

@ -189,6 +189,9 @@ Trunk (Unreleased)
HADOOP-10720. KMS: Implement generateEncryptedKey and decryptEncryptedKey
in the REST API. (asuresh via tucu)
HADOOP-10891. Add EncryptedKeyVersion factory method to
KeyProviderCryptoExtension. (wang)
BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled.
@ -452,6 +455,11 @@ Release 2.6.0 - UNRELEASED
HADOOP-10855. Allow Text to be read with a known Length. (todd)
HADOOP-10887. Add XAttrs to ViewFs and make XAttrs + ViewFileSystem
internal dir behavior consistent. (Stephen Chu via wang)
HADOOP-10882. Move DirectBufferPool into common util. (todd)
OPTIMIZATIONS
BUG FIXES
@ -811,6 +819,12 @@ Release 2.5.0 - UNRELEASED
HADOOP-10872. TestPathData fails intermittently with "Mkdirs failed
to create d1". (Yongjun Zhang via Arpit Agarwal)
HADOOP-10890. TestDFVariations.testMount fails intermittently. (Yongjun
Zhang via Arpit Agarwal)
HADOOP-10894. Fix dead link in ToolRunner documentation. (Akira Ajisaka
via Arpit Agarwal)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -79,6 +79,30 @@ public class KeyProviderCryptoExtension extends
this.encryptedKeyVersion = encryptedKeyVersion;
}
/**
* Factory method to create a new EncryptedKeyVersion that can then be
* passed into {@link #decryptEncryptedKey}. Note that the fields of the
* returned EncryptedKeyVersion will only partially be populated; it is not
* necessarily suitable for operations besides decryption.
*
* @param encryptionKeyVersionName Version name of the encryption key used
* to encrypt the encrypted key.
* @param encryptedKeyIv Initialization vector of the encrypted
* key. The IV of the encryption key used to
* encrypt the encrypted key is derived from
* this IV.
* @param encryptedKeyMaterial Key material of the encrypted key.
* @return EncryptedKeyVersion suitable for decryption.
*/
public static EncryptedKeyVersion createForDecryption(String
encryptionKeyVersionName, byte[] encryptedKeyIv,
byte[] encryptedKeyMaterial) {
KeyVersion encryptedKeyVersion = new KeyVersion(null, null,
encryptedKeyMaterial);
return new EncryptedKeyVersion(null, encryptionKeyVersionName,
encryptedKeyIv, encryptedKeyVersion);
}
/**
* @return Name of the encryption key used to encrypt the encrypted key.
*/

View File

@ -2484,4 +2484,33 @@ public final class FileContext {
}
}.resolve(this, absF);
}
/**
* Get all of the xattr names for a file or directory.
* Only those xattr names which the logged-in user has permissions to view
* are returned.
* <p/>
* A regular user can only get xattr names for the "user" namespace.
* The super user can only get xattr names for "user" and "trusted"
* namespaces.
* The xattrs of the "security" and "system" namespaces are only
* used/exposed internally by/to the FS impl.
* <p/>
* @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
* http://en.wikipedia.org/wiki/Extended_file_attributes</a>
*
* @param path Path to get extended attributes
* @return List<String> of the XAttr names of the file or directory
* @throws IOException
*/
public List<String> listXAttrs(Path path) throws IOException {
final Path absF = fixRelativePart(path);
return new FSLinkResolver<List<String>>() {
@Override
public List<String> next(final AbstractFileSystem fs, final Path p)
throws IOException {
return fs.listXAttrs(p);
}
}.resolve(this, absF);
}
}

View File

@ -2509,7 +2509,7 @@ public abstract class FileSystem extends Configured implements Closeable {
* http://en.wikipedia.org/wiki/Extended_file_attributes</a>
*
* @param path Path to get extended attributes
* @return Map<String, byte[]> describing the XAttrs of the file or directory
* @return List<String> of the XAttr names of the file or directory
* @throws IOException
*/
public List<String> listXAttrs(Path path) throws IOException {

View File

@ -22,6 +22,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
@ -313,6 +315,38 @@ class ChRootedFs extends AbstractFileSystem {
return myFs.getAclStatus(fullPath(path));
}
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
myFs.setXAttr(fullPath(path), name, value, flag);
}
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
return myFs.getXAttr(fullPath(path), name);
}
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
return myFs.getXAttrs(fullPath(path));
}
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
return myFs.getXAttrs(fullPath(path), names);
}
@Override
public List<String> listXAttrs(Path path) throws IOException {
return myFs.listXAttrs(fullPath(path));
}
@Override
public void removeXAttr(Path path, String name) throws IOException {
myFs.removeXAttr(fullPath(path), name);
}
@Override
public void setVerifyChecksum(final boolean verifyChecksum)
throws IOException, UnresolvedLinkException {

View File

@ -913,5 +913,39 @@ public class ViewFileSystem extends FileSystem {
.addEntries(AclUtil.getMinimalAcl(PERMISSION_555))
.stickyBit(false).build();
}
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
checkPathIsSlash(path);
throw readOnlyMountTable("setXAttr", path);
}
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
throw new NotInMountpointException(path, "getXAttr");
}
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
throw new NotInMountpointException(path, "getXAttrs");
}
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
throw new NotInMountpointException(path, "getXAttrs");
}
@Override
public List<String> listXAttrs(Path path) throws IOException {
throw new NotInMountpointException(path, "listXAttrs");
}
@Override
public void removeXAttr(Path path, String name) throws IOException {
checkPathIsSlash(path);
throw readOnlyMountTable("removeXAttr", path);
}
}
}

View File

@ -26,6 +26,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
@ -48,6 +49,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.local.LocalConfigKeys;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclUtil;
@ -651,6 +653,50 @@ public class ViewFs extends AbstractFileSystem {
fsState.resolve(getUriPath(path), true);
return res.targetFileSystem.getAclStatus(res.remainingPath);
}
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(path), true);
res.targetFileSystem.setXAttr(res.remainingPath, name, value, flag);
}
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(path), true);
return res.targetFileSystem.getXAttr(res.remainingPath, name);
}
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(path), true);
return res.targetFileSystem.getXAttrs(res.remainingPath);
}
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(path), true);
return res.targetFileSystem.getXAttrs(res.remainingPath, names);
}
@Override
public List<String> listXAttrs(Path path) throws IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(path), true);
return res.targetFileSystem.listXAttrs(res.remainingPath);
}
@Override
public void removeXAttr(Path path, String name) throws IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(path), true);
res.targetFileSystem.removeXAttr(res.remainingPath, name);
}
/*
@ -921,5 +967,39 @@ public class ViewFs extends AbstractFileSystem {
.addEntries(AclUtil.getMinimalAcl(PERMISSION_555))
.stickyBit(false).build();
}
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
checkPathIsSlash(path);
throw readOnlyMountTable("setXAttr", path);
}
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
throw new NotInMountpointException(path, "getXAttr");
}
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
throw new NotInMountpointException(path, "getXAttrs");
}
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
throw new NotInMountpointException(path, "getXAttrs");
}
@Override
public List<String> listXAttrs(Path path) throws IOException {
throw new NotInMountpointException(path, "listXAttrs");
}
@Override
public void removeXAttr(Path path, String name) throws IOException {
checkPathIsSlash(path);
throw readOnlyMountTable("removeXAttr", path);
}
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
package org.apache.hadoop.util;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A simple class for pooling direct ByteBuffers. This is necessary
@ -40,7 +41,8 @@ import com.google.common.annotations.VisibleForTesting;
* allocated at the same size. There is no attempt to reuse larger
* buffers to satisfy smaller allocations.
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class DirectBufferPool {
// Essentially implement a multimap with weak values.

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration;
* <p><code>ToolRunner</code> can be used to run classes implementing
* <code>Tool</code> interface. It works in conjunction with
* {@link GenericOptionsParser} to parse the
* <a href="{@docRoot}/org/apache/hadoop/util/GenericOptionsParser.html#GenericOptions">
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/CommandsManual.html#Generic_Options">
* generic hadoop command line arguments</a> and modifies the
* <code>Configuration</code> of the <code>Tool</code>. The
* application-specific options are passed along without being modified.

View File

@ -29,14 +29,33 @@ import java.util.Random;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestDFVariations {
private static final String TEST_ROOT_DIR =
System.getProperty("test.build.data","build/test/data") + "/TestDFVariations";
private static File test_root = null;
@Before
public void setup() throws IOException {
test_root = new File(TEST_ROOT_DIR);
test_root.mkdirs();
}
@After
public void after() throws IOException {
FileUtil.setWritable(test_root, true);
FileUtil.fullyDelete(test_root);
assertTrue(!test_root.exists());
}
public static class XXDF extends DF {
public XXDF() throws IOException {
super(new File(System.getProperty("test.build.data","/tmp")), 0L);
super(test_root, 0L);
}
@Override

View File

@ -773,4 +773,34 @@ public class ViewFileSystemBaseTest {
assertFalse(aclStatus.isStickyBit());
}
@Test(expected=AccessControlException.class)
public void testInternalSetXAttr() throws IOException {
fsView.setXAttr(new Path("/internalDir"), "xattrName", null);
}
@Test(expected=NotInMountpointException.class)
public void testInternalGetXAttr() throws IOException {
fsView.getXAttr(new Path("/internalDir"), "xattrName");
}
@Test(expected=NotInMountpointException.class)
public void testInternalGetXAttrs() throws IOException {
fsView.getXAttrs(new Path("/internalDir"));
}
@Test(expected=NotInMountpointException.class)
public void testInternalGetXAttrsWithNames() throws IOException {
fsView.getXAttrs(new Path("/internalDir"), new ArrayList<String>());
}
@Test(expected=NotInMountpointException.class)
public void testInternalListXAttr() throws IOException {
fsView.listXAttrs(new Path("/internalDir"));
}
@Test(expected=AccessControlException.class)
public void testInternalRemoveXAttr() throws IOException {
fsView.removeXAttr(new Path("/internalDir"), "xattrName");
}
}

View File

@ -747,4 +747,34 @@ public class ViewFsBaseTest {
AclUtil.getMinimalAcl(PERMISSION_555));
assertFalse(aclStatus.isStickyBit());
}
@Test(expected=AccessControlException.class)
public void testInternalSetXAttr() throws IOException {
fcView.setXAttr(new Path("/internalDir"), "xattrName", null);
}
@Test(expected=NotInMountpointException.class)
public void testInternalGetXAttr() throws IOException {
fcView.getXAttr(new Path("/internalDir"), "xattrName");
}
@Test(expected=NotInMountpointException.class)
public void testInternalGetXAttrs() throws IOException {
fcView.getXAttrs(new Path("/internalDir"));
}
@Test(expected=NotInMountpointException.class)
public void testInternalGetXAttrsWithNames() throws IOException {
fcView.getXAttrs(new Path("/internalDir"), new ArrayList<String>());
}
@Test(expected=NotInMountpointException.class)
public void testInternalListXAttr() throws IOException {
fcView.listXAttrs(new Path("/internalDir"));
}
@Test(expected=AccessControlException.class)
public void testInternalRemoveXAttr() throws IOException {
fcView.removeXAttr(new Path("/internalDir"), "xattrName");
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
package org.apache.hadoop.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
@ -29,7 +29,7 @@ import org.junit.Test;
import com.google.common.collect.Lists;
public class TestDirectBufferPool {
final DirectBufferPool pool = new DirectBufferPool();
final org.apache.hadoop.util.DirectBufferPool pool = new org.apache.hadoop.util.DirectBufferPool();
@Test
public void testBasics() {

View File

@ -53,7 +53,12 @@ public class NfsExports {
long expirationPeriodNano = conf.getLong(
Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY,
Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT) * 1000 * 1000;
exports = new NfsExports(cacheSize, expirationPeriodNano, matchHosts);
try {
exports = new NfsExports(cacheSize, expirationPeriodNano, matchHosts);
} catch (IllegalArgumentException e) {
LOG.error("Invalid NFS Exports provided: ", e);
return exports;
}
}
return exports;
}

View File

@ -104,6 +104,10 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
@Override
public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
if (hostsMatcher == null) {
return MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out, xid,
null);
}
AccessPrivilege accessPrivilege = hostsMatcher.getAccessPrivilege(client);
if (accessPrivilege == AccessPrivilege.NONE) {
return MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out, xid,
@ -208,16 +212,23 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
} else if (mntproc == MNTPROC.UMNTALL) {
umntall(out, xid, client);
} else if (mntproc == MNTPROC.EXPORT) {
// Currently only support one NFS export
// Currently only support one NFS export
List<NfsExports> hostsMatchers = new ArrayList<NfsExports>();
hostsMatchers.add(hostsMatcher);
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
if (hostsMatcher != null) {
hostsMatchers.add(hostsMatcher);
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
} else {
// This means there are no valid exports provided.
RpcAcceptedReply.getInstance(xid,
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out);
}
} else {
// Invalid procedure
RpcAcceptedReply.getInstance(xid,
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out);
}
}
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);

View File

@ -2123,8 +2123,11 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
if (!doPortMonitoring(remoteAddress)) {
return false;
}
// Check export table
if (exports == null) {
return false;
}
InetAddress client = ((InetSocketAddress) remoteAddress).getAddress();
AccessPrivilege access = exports.getAccessPrivilege(client);
if (access == AccessPrivilege.NONE) {

View File

@ -252,6 +252,9 @@ Trunk (Unreleased)
HDFS-5794. Fix the inconsistency of layout version number of
ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9)
HDFS-6657. Remove link to 'Legacy UI' in trunk's Namenode UI.
(Vinayakumar B via wheat 9)
Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -357,6 +360,16 @@ Release 2.6.0 - UNRELEASED
HDFS-6731. Run "hdfs zkfc-formatZK" on a server in a non-namenode will cause
a null pointer exception. (Masatake Iwasaki via brandonli)
HDFS-6114. Block Scan log rolling will never happen if blocks written
continuously leading to huge size of dncp_block_verification.log.curr
(vinayakumarb via cmccabe)
HDFS-6455. NFS: Exception should be added in NFS log for invalid separator in
nfs.exports.allowed.hosts. (Abhiraj Butala via brandonli)
HDFS-6715. Webhdfs wont fail over when it gets java.io.IOException: Namenode
is in startup mode. (jing9)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -892,6 +905,12 @@ Release 2.5.0 - UNRELEASED
HDFS-6703. NFS: Files can be deleted from a read-only mount
(Srikanth Upputuri via brandonli)
HDFS-6422. getfattr in CLI doesn't throw exception or return non-0 return code
when xattr doesn't exist. (Charles Lamb via umamahesh)
HDFS-6696. Name node cannot start if the path of a file under
construction contains ".snapshot". (wang)
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
@ -970,6 +989,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6622. Rename and AddBlock may race and produce invalid edits (kihwal
via cmccabe)
HDFS-6723. New NN webUI no longer displays decommissioned state for dead node.
(Ming Ma via wheat9)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;

View File

@ -1358,6 +1358,6 @@ public interface ClientProtocol {
* @param xAttr <code>XAttr</code> to remove
* @throws IOException
*/
@Idempotent
@AtMostOnce
public void removeXAttr(String src, XAttr xAttr) throws IOException;
}

View File

@ -27,7 +27,7 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;

View File

@ -84,6 +84,10 @@ class BlockPoolSliceScanner {
private final SortedSet<BlockScanInfo> blockInfoSet
= new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
private final SortedSet<BlockScanInfo> newBlockInfoSet =
new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
private final GSet<Block, BlockScanInfo> blockMap
= new LightWeightGSet<Block, BlockScanInfo>(
LightWeightGSet.computeCapacity(0.5, "BlockMap"));
@ -195,7 +199,7 @@ class BlockPoolSliceScanner {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
addBlockInfo(info);
addBlockInfo(info, false);
}
RollingLogs rollingLogs = null;
@ -221,25 +225,42 @@ class BlockPoolSliceScanner {
// Should we change throttler bandwidth every time bytesLeft changes?
// not really required.
}
private synchronized void addBlockInfo(BlockScanInfo info) {
boolean added = blockInfoSet.add(info);
/**
* Add the BlockScanInfo to sorted set of blockScanInfo
* @param info BlockScanInfo to be added
* @param isNewBlock true if the block is the new Block, false if
* BlockScanInfo is being updated with new scanTime
*/
private synchronized void addBlockInfo(BlockScanInfo info,
boolean isNewBlock) {
boolean added = false;
if (isNewBlock) {
// check whether the block already present
boolean exists = blockInfoSet.contains(info);
added = !exists && newBlockInfoSet.add(info);
} else {
added = blockInfoSet.add(info);
}
blockMap.put(info);
if (added) {
updateBytesToScan(info.getNumBytes(), info.lastScanTime);
}
}
private synchronized void delBlockInfo(BlockScanInfo info) {
boolean exists = blockInfoSet.remove(info);
if (!exists){
exists = newBlockInfoSet.remove(info);
}
blockMap.remove(info);
if (exists) {
updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
}
}
/** Update blockMap by the given LogEntry */
private synchronized void updateBlockInfo(LogEntry e) {
BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
@ -249,7 +270,7 @@ class BlockPoolSliceScanner {
delBlockInfo(info);
info.lastScanTime = e.verificationTime;
info.lastScanType = ScanType.VERIFICATION_SCAN;
addBlockInfo(info);
addBlockInfo(info, false);
}
}
@ -275,14 +296,14 @@ class BlockPoolSliceScanner {
info = new BlockScanInfo(block.getLocalBlock());
info.lastScanTime = getNewBlockScanTime();
addBlockInfo(info);
addBlockInfo(info, true);
adjustThrottler();
}
/** Deletes the block from internal structures */
synchronized void deleteBlock(Block block) {
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
if (info != null) {
delBlockInfo(info);
}
}
@ -319,7 +340,7 @@ class BlockPoolSliceScanner {
info.lastScanType = type;
info.lastScanTime = now;
info.lastScanOk = scanOk;
addBlockInfo(info);
addBlockInfo(info, false);
// Don't update meta data if the verification failed.
if (!scanOk) {
@ -578,7 +599,7 @@ class BlockPoolSliceScanner {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
addBlockInfo(info);
addBlockInfo(info, false);
}
}
}
@ -674,12 +695,21 @@ class BlockPoolSliceScanner {
throw e;
} finally {
rollVerificationLogs();
rollNewBlocksInfo();
if (LOG.isDebugEnabled()) {
LOG.debug("Done scanning block pool: " + blockPoolId);
}
}
}
// add new blocks to scan in next iteration
private synchronized void rollNewBlocksInfo() {
for (BlockScanInfo newBlock : newBlockInfoSet) {
blockInfoSet.add(newBlock);
}
newBlockInfoSet.clear();
}
private synchronized void rollVerificationLogs() {
if (verificationLog != null) {
try {

View File

@ -1074,10 +1074,11 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
void logRemoveXAttrs(String src, List<XAttr> xAttrs) {
void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
final RemoveXAttrOp op = RemoveXAttrOp.getInstance();
op.src = src;
op.xAttrs = xAttrs;
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

View File

@ -821,6 +821,10 @@ public class FSEditLogLoader {
RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src,
removeXAttrOp.xAttrs);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
removeXAttrOp.rpcCallId);
}
break;
}
default:

View File

@ -3551,6 +3551,7 @@ public abstract class FSEditLogOp {
XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
src = p.getSrc();
xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
readRpcIds(in, logVersion);
}
@Override
@ -3561,18 +3562,22 @@ public abstract class FSEditLogOp {
}
b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
// clientId and callId
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src);
appendXAttrsToXml(contentHandler, xAttrs);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
src = st.getValue("SRC");
xAttrs = readXAttrsFromXml(st);
readRpcIdsFromXml(st);
}
}

View File

@ -614,6 +614,16 @@ public class FSImageFormat {
INodeDirectory parentINode = fsDir.rootDir;
for (long i = 0; i < numFiles; i++) {
pathComponents = FSImageSerialization.readPathComponents(in);
for (int j=0; j < pathComponents.length; j++) {
byte[] newComponent = renameReservedComponentOnUpgrade
(pathComponents[j], getLayoutVersion());
if (!Arrays.equals(newComponent, pathComponents[j])) {
String oldPath = DFSUtil.byteArray2PathString(pathComponents);
pathComponents[j] = newComponent;
String newPath = DFSUtil.byteArray2PathString(pathComponents);
LOG.info("Renaming reserved path " + oldPath + " to " + newPath);
}
}
final INode newNode = loadINode(
pathComponents[pathComponents.length-1], false, in, counter);
@ -926,6 +936,7 @@ public class FSImageFormat {
oldnode = namesystem.dir.getInode(cons.getId()).asFile();
inSnapshot = true;
} else {
path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
final INodesInPath iip = fsDir.getLastINodeInPath(path);
oldnode = INodeFile.valueOf(iip.getINode(0), path);
}

View File

@ -8658,11 +8658,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
nnConf.checkXAttrsConfigFlag();
FSPermissionChecker pc = getPermissionChecker();
boolean getAll = xAttrs == null || xAttrs.isEmpty();
List<XAttr> filteredXAttrs = null;
if (!getAll) {
filteredXAttrs = XAttrPermissionFilter.filterXAttrsForApi(pc, xAttrs);
if (filteredXAttrs.isEmpty()) {
return filteredXAttrs;
try {
XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs);
} catch (AccessControlException e) {
logAuditEvent(false, "getXAttrs", src);
throw e;
}
}
checkOperation(OperationCategory.READ);
@ -8681,15 +8682,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (filteredAll == null || filteredAll.isEmpty()) {
return null;
}
List<XAttr> toGet = Lists.newArrayListWithCapacity(filteredXAttrs.size());
for (XAttr xAttr : filteredXAttrs) {
List<XAttr> toGet = Lists.newArrayListWithCapacity(xAttrs.size());
for (XAttr xAttr : xAttrs) {
boolean foundIt = false;
for (XAttr a : filteredAll) {
if (xAttr.getNameSpace() == a.getNameSpace()
&& xAttr.getName().equals(a.getName())) {
toGet.add(a);
foundIt = true;
break;
}
}
if (!foundIt) {
throw new IOException(
"At least one of the attributes provided was not found.");
}
}
return toGet;
}
@ -8723,17 +8730,42 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
readUnlock();
}
}
/**
* Remove an xattr for a file or directory.
*
* @param src
* - path to remove the xattr from
* @param xAttr
* - xAttr to remove
* @throws AccessControlException
* @throws SafeModeException
* @throws UnresolvedLinkException
* @throws IOException
*/
void removeXAttr(String src, XAttr xAttr) throws IOException {
nnConf.checkXAttrsConfigFlag();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
removeXAttrInt(src, xAttr, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "removeXAttr", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
void removeXAttrInt(String src, XAttr xAttr, boolean logRetryCache)
throws IOException {
nnConf.checkXAttrsConfigFlag();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
@ -8747,12 +8779,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
xAttrs.add(xAttr);
List<XAttr> removedXAttrs = dir.removeXAttrs(src, xAttrs);
if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
getEditLog().logRemoveXAttrs(src, removedXAttrs);
getEditLog().logRemoveXAttrs(src, removedXAttrs, logRetryCache);
} else {
throw new IOException(
"No matching attributes found for remove operation");
}
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) {
logAuditEvent(false, "removeXAttr", src);
throw e;
} finally {
writeUnlock();
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.security.AccessControlException;
import com.google.common.collect.Lists;
import com.google.common.base.Preconditions;
/**
* There are four types of extended attributes <XAttr> defined by the
@ -60,8 +61,20 @@ public class XAttrPermissionFilter {
throw new AccessControlException("User doesn't have permission for xattr: "
+ XAttrHelper.getPrefixName(xAttr));
}
static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc,
static void checkPermissionForApi(FSPermissionChecker pc,
List<XAttr> xAttrs) throws AccessControlException {
Preconditions.checkArgument(xAttrs != null);
if (xAttrs.isEmpty()) {
return;
}
for (XAttr xAttr : xAttrs) {
checkPermissionForApi(pc, xAttr);
}
}
static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc,
List<XAttr> xAttrs) {
assert xAttrs != null : "xAttrs can not be null";
if (xAttrs == null || xAttrs.isEmpty()) {

View File

@ -113,6 +113,7 @@ import org.apache.hadoop.hdfs.web.resources.XAttrNameParam;
import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam;
import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.net.Node;
@ -190,7 +191,7 @@ public class NamenodeWebHdfsMethods {
throws IOException {
final NamenodeProtocols np = namenode.getRpcServer();
if (np == null) {
throw new IOException("Namenode is in startup mode");
throw new RetriableException("Namenode is in startup mode");
}
return np;
}

View File

@ -25,8 +25,8 @@ public class XAttrNameParam extends StringParam {
/** Default parameter value. **/
public static final String DEFAULT = "";
private static Domain DOMAIN = new Domain(NAME,
Pattern.compile("^(user\\.|trusted\\.|system\\.|security\\.).+"));
private static Domain DOMAIN = new Domain(NAME,
Pattern.compile(".*"));
public XAttrNameParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT) ? null : str);

View File

@ -66,7 +66,6 @@
<div class="row">
<hr />
<div class="col-xs-2"><p>Hadoop, 2014.</p></div>
<div class="col-xs-1 pull-right"><a style="color: #ddd" href="dfshealth.jsp">Legacy UI</a></div>
</div>
</div>
@ -283,7 +282,7 @@
<tr class="danger">
<td>{name} ({xferaddr})</td>
<td>{lastContact}</td>
<td>Dead{?decomissioned}, Decomissioned{/decomissioned}</td>
<td>Dead{?decommissioned}, Decommissioned{/decommissioned}</td>
<td>-</td>
<td>-</td>
<td>-</td>

View File

@ -18,18 +18,7 @@
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="REFRESH" content="1;url=dfshealth.jsp" />
<meta http-equiv="REFRESH" content="0;url=dfshealth.html" />
<title>Hadoop Administration</title>
</head>
<body>
<script type="text/javascript">
//<![CDATA[
window.location.href='dfshealth.html';
//]]>
</script>
<h1>Hadoop Administration</h1>
<ul>
<li><a href="dfshealth.jsp">DFS Health/Status</a></li>
</ul>
</body>
</html>

View File

@ -21,15 +21,4 @@
<meta http-equiv="REFRESH" content="0;url=status.html" />
<title>Hadoop Administration</title>
</head>
<body>
<script type="text/javascript">
//<![CDATA[
window.location.href='status.html';
//]]>
</script>
<h1>Hadoop Administration</h1>
<ul>
<li><a href="status.jsp">Status</a></li>
</ul>
</body>
</html>

View File

@ -2653,6 +2653,75 @@ public class TestDFSShell {
}
}
/*
* 1. Test that CLI throws an exception and returns non-0 when user does
* not have permission to read an xattr.
* 2. Test that CLI throws an exception and returns non-0 when a non-existent
* xattr is requested.
*/
@Test (timeout = 120000)
public void testGetFAttrErrors() throws Exception {
final UserGroupInformation user = UserGroupInformation.
createUserForTesting("user", new String[] {"mygroup"});
MiniDFSCluster cluster = null;
PrintStream bakErr = null;
try {
final Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/foo");
fs.mkdirs(p);
bakErr = System.err;
final FsShell fshell = new FsShell(conf);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
// No permission for "other".
fs.setPermission(p, new FsPermission((short) 0700));
{
final int ret = ToolRunner.run(fshell, new String[] {
"-setfattr", "-n", "user.a1", "-v", "1234", "/foo"});
assertEquals("Returned should be 0", 0, ret);
out.reset();
}
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
int ret = ToolRunner.run(fshell, new String[] {
"-getfattr", "-n", "user.a1", "/foo"});
String str = out.toString();
assertTrue("xattr value was incorrectly returned",
str.indexOf("1234") == -1);
out.reset();
return null;
}
});
{
final int ret = ToolRunner.run(fshell, new String[]{
"-getfattr", "-n", "user.nonexistent", "/foo"});
String str = out.toString();
assertTrue("xattr value was incorrectly returned",
str.indexOf(
"getfattr: At least one of the attributes provided was not found")
>= 0);
out.reset();
}
} finally {
if (bakErr != null) {
System.setErr(bakErr);
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test that the server trash configuration is respected when
* the client configuration is not set.

View File

@ -70,6 +70,9 @@ public class TestDFSUpgradeFromImage {
private static final String HADOOP_DFS_DIR_TXT = "hadoop-dfs-dir.txt";
private static final String HADOOP22_IMAGE = "hadoop-22-dfs-dir.tgz";
private static final String HADOOP1_BBW_IMAGE = "hadoop1-bbw.tgz";
private static final String HADOOP1_RESERVED_IMAGE = "hadoop-1-reserved.tgz";
private static final String HADOOP023_RESERVED_IMAGE =
"hadoop-0.23-reserved.tgz";
private static final String HADOOP2_RESERVED_IMAGE = "hadoop-2-reserved.tgz";
private static class ReferenceFileInfo {
@ -325,6 +328,140 @@ public class TestDFSUpgradeFromImage {
}
}
/**
* Test upgrade from a branch-1.2 image with reserved paths
*/
@Test
public void testUpgradeFromRel1ReservedImage() throws Exception {
unpackStorage(HADOOP1_RESERVED_IMAGE);
MiniDFSCluster cluster = null;
// Try it once without setting the upgrade flag to ensure it fails
final Configuration conf = new Configuration();
// Try it again with a custom rename string
try {
FSImageFormat.setRenameReservedPairs(
".snapshot=.user-snapshot," +
".reserved=.my-reserved");
cluster =
new MiniDFSCluster.Builder(conf)
.format(false)
.startupOption(StartupOption.UPGRADE)
.numDataNodes(0).build();
DistributedFileSystem dfs = cluster.getFileSystem();
// Make sure the paths were renamed as expected
// Also check that paths are present after a restart, checks that the
// upgraded fsimage has the same state.
final String[] expected = new String[] {
"/.my-reserved",
"/.user-snapshot",
"/.user-snapshot/.user-snapshot",
"/.user-snapshot/open",
"/dir1",
"/dir1/.user-snapshot",
"/dir2",
"/dir2/.user-snapshot",
"/user",
"/user/andrew",
"/user/andrew/.user-snapshot",
};
for (int i=0; i<2; i++) {
// Restart the second time through this loop
if (i==1) {
cluster.finalizeCluster(conf);
cluster.restartNameNode(true);
}
ArrayList<Path> toList = new ArrayList<Path>();
toList.add(new Path("/"));
ArrayList<String> found = new ArrayList<String>();
while (!toList.isEmpty()) {
Path p = toList.remove(0);
FileStatus[] statuses = dfs.listStatus(p);
for (FileStatus status: statuses) {
final String path = status.getPath().toUri().getPath();
System.out.println("Found path " + path);
found.add(path);
if (status.isDirectory()) {
toList.add(status.getPath());
}
}
}
for (String s: expected) {
assertTrue("Did not find expected path " + s, found.contains(s));
}
assertEquals("Found an unexpected path while listing filesystem",
found.size(), expected.length);
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test upgrade from a 0.23.11 image with reserved paths
*/
@Test
public void testUpgradeFromRel023ReservedImage() throws Exception {
unpackStorage(HADOOP023_RESERVED_IMAGE);
MiniDFSCluster cluster = null;
// Try it once without setting the upgrade flag to ensure it fails
final Configuration conf = new Configuration();
// Try it again with a custom rename string
try {
FSImageFormat.setRenameReservedPairs(
".snapshot=.user-snapshot," +
".reserved=.my-reserved");
cluster =
new MiniDFSCluster.Builder(conf)
.format(false)
.startupOption(StartupOption.UPGRADE)
.numDataNodes(0).build();
DistributedFileSystem dfs = cluster.getFileSystem();
// Make sure the paths were renamed as expected
// Also check that paths are present after a restart, checks that the
// upgraded fsimage has the same state.
final String[] expected = new String[] {
"/.user-snapshot",
"/dir1",
"/dir1/.user-snapshot",
"/dir2",
"/dir2/.user-snapshot"
};
for (int i=0; i<2; i++) {
// Restart the second time through this loop
if (i==1) {
cluster.finalizeCluster(conf);
cluster.restartNameNode(true);
}
ArrayList<Path> toList = new ArrayList<Path>();
toList.add(new Path("/"));
ArrayList<String> found = new ArrayList<String>();
while (!toList.isEmpty()) {
Path p = toList.remove(0);
FileStatus[] statuses = dfs.listStatus(p);
for (FileStatus status: statuses) {
final String path = status.getPath().toUri().getPath();
System.out.println("Found path " + path);
found.add(path);
if (status.isDirectory()) {
toList.add(status.getPath());
}
}
}
for (String s: expected) {
assertTrue("Did not find expected path " + s, found.contains(s));
}
assertEquals("Found an unexpected path while listing filesystem",
found.size(), expected.length);
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test upgrade from 2.0 image with a variety of .snapshot and .reserved
* paths to test renaming on upgrade

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.List;
@ -46,6 +45,7 @@ import static org.apache.hadoop.fs.permission.AclEntryType.USER;
import static org.apache.hadoop.fs.permission.FsAction.ALL;
import static org.apache.hadoop.fs.permission.FsAction.READ;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.After;
@ -261,11 +261,12 @@ public class FSXAttrBaseTest {
fs.setXAttr(path, "user.", value1, EnumSet.of(XAttrSetFlag.CREATE,
XAttrSetFlag.REPLACE));
Assert.fail("Setting xattr with empty name should fail.");
} catch (RemoteException e) {
assertEquals("Unexpected RemoteException: " + e, e.getClassName(),
HadoopIllegalArgumentException.class.getCanonicalName());
GenericTestUtils.assertExceptionContains("XAttr name cannot be empty", e);
} catch (HadoopIllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("XAttr name cannot be empty", e);
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Invalid value: \"user.\" does " +
"not belong to the domain ^(user\\.|trusted\\.|system\\.|security\\.).+", e);
}
// Set xattr with invalid name: "a1"
@ -274,11 +275,12 @@ public class FSXAttrBaseTest {
XAttrSetFlag.REPLACE));
Assert.fail("Setting xattr with invalid name prefix or without " +
"name prefix should fail.");
} catch (RemoteException e) {
assertEquals("Unexpected RemoteException: " + e, e.getClassName(),
HadoopIllegalArgumentException.class.getCanonicalName());
GenericTestUtils.assertExceptionContains("XAttr name must be prefixed", e);
} catch (HadoopIllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("XAttr name must be prefixed", e);
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Invalid value: \"a1\" does " +
"not belong to the domain ^(user\\.|trusted\\.|system\\.|security\\.).+", e);
}
// Set xattr without XAttrSetFlag
@ -341,9 +343,18 @@ public class FSXAttrBaseTest {
}
/**
* Tests for getting xattr
* 1. To get xattr which does not exist.
* 2. To get multiple xattrs.
* getxattr tests. Test that getxattr throws an exception if any of
* the following are true:
* an xattr that was requested doesn't exist
* the caller specifies an unknown namespace
* the caller doesn't have access to the namespace
* the caller doesn't have permission to get the value of the xattr
* the caller does not have search access to the parent directory
* the caller has only read access to the owning directory
* the caller has only search access to the owning directory and
* execute/search access to the actual entity
* the caller does not have search access to the owning directory and read
* access to the actual entity
*/
@Test(timeout = 120000)
public void testGetXAttrs() throws Exception {
@ -351,21 +362,159 @@ public class FSXAttrBaseTest {
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
// XAttr does not exist.
byte[] value = fs.getXAttr(path, name3);
Assert.assertEquals(value, null);
/* An XAttr that was requested does not exist. */
try {
final byte[] value = fs.getXAttr(path, name3);
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
"At least one of the attributes provided was not found.", e);
}
List<String> names = Lists.newArrayList();
names.add(name1);
names.add(name2);
names.add(name3);
Map<String, byte[]> xattrs = fs.getXAttrs(path, names);
Assert.assertEquals(xattrs.size(), 2);
Assert.assertArrayEquals(value1, xattrs.get(name1));
Assert.assertArrayEquals(value2, xattrs.get(name2));
/* Throw an exception if an xattr that was requested does not exist. */
{
final List<String> names = Lists.newArrayList();
names.add(name1);
names.add(name2);
names.add(name3);
try {
final Map<String, byte[]> xattrs = fs.getXAttrs(path, names);
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
"At least one of the attributes provided was not found.", e);
}
}
fs.removeXAttr(path, name1);
fs.removeXAttr(path, name2);
/* Unknown namespace should throw an exception. */
try {
final byte[] xattr = fs.getXAttr(path, "wackynamespace.foo");
Assert.fail("expected IOException");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains
("An XAttr name must be prefixed with user/trusted/security/system, " +
"followed by a '.'",
e);
}
/*
* The 'trusted' namespace should not be accessible and should throw an
* exception.
*/
final UserGroupInformation user = UserGroupInformation.
createUserForTesting("user", new String[] {"mygroup"});
fs.setXAttr(path, "trusted.foo", "1234".getBytes());
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
final byte[] xattr = userFs.getXAttr(path, "trusted.foo");
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("User doesn't have permission", e);
}
fs.setXAttr(path, name1, "1234".getBytes());
/*
* Test that an exception is thrown if the caller doesn't have permission to
* get the value of the xattr.
*/
/* Set access so that only the owner has access. */
fs.setPermission(path, new FsPermission((short) 0700));
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
final byte[] xattr = userFs.getXAttr(path, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/*
* The caller must have search access to the parent directory.
*/
final Path childDir = new Path(path, "child" + pathCount);
/* Set access to parent so that only the owner has access. */
FileSystem.mkdirs(fs, childDir, FsPermission.createImmutable((short)0700));
fs.setXAttr(childDir, name1, "1234".getBytes());
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
final byte[] xattr = userFs.getXAttr(childDir, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/* Check that read access to the owning directory is not good enough. */
fs.setPermission(path, new FsPermission((short) 0704));
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
final byte[] xattr = userFs.getXAttr(childDir, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/*
* Check that search access to the owning directory and search/execute
* access to the actual entity with extended attributes is not good enough.
*/
fs.setPermission(path, new FsPermission((short) 0701));
fs.setPermission(childDir, new FsPermission((short) 0701));
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
final byte[] xattr = userFs.getXAttr(childDir, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/*
* Check that search access to the owning directory and read access to
* the actual entity with the extended attribute is good enough.
*/
fs.setPermission(path, new FsPermission((short) 0701));
fs.setPermission(childDir, new FsPermission((short) 0704));
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
final byte[] xattr = userFs.getXAttr(childDir, name1);
return null;
}
});
}
/**
@ -402,6 +551,166 @@ public class FSXAttrBaseTest {
fs.removeXAttr(path, name3);
}
/**
* removexattr tests. Test that removexattr throws an exception if any of
* the following are true:
* an xattr that was requested doesn't exist
* the caller specifies an unknown namespace
* the caller doesn't have access to the namespace
* the caller doesn't have permission to get the value of the xattr
* the caller does not have "execute" (scan) access to the parent directory
* the caller has only read access to the owning directory
* the caller has only execute access to the owning directory and execute
* access to the actual entity
* the caller does not have execute access to the owning directory and write
* access to the actual entity
*/
@Test(timeout = 120000)
public void testRemoveXAttrPermissions() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
fs.setXAttr(path, name3, null, EnumSet.of(XAttrSetFlag.CREATE));
try {
fs.removeXAttr(path, name2);
fs.removeXAttr(path, name2);
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("No matching attributes found", e);
}
/* Unknown namespace should throw an exception. */
final String expectedExceptionString = "An XAttr name must be prefixed " +
"with user/trusted/security/system, followed by a '.'";
try {
fs.removeXAttr(path, "wackynamespace.foo");
Assert.fail("expected IOException");
} catch (RemoteException e) {
assertEquals("Unexpected RemoteException: " + e, e.getClassName(),
HadoopIllegalArgumentException.class.getCanonicalName());
GenericTestUtils.assertExceptionContains(expectedExceptionString, e);
} catch (HadoopIllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(expectedExceptionString, e);
}
/*
* The 'trusted' namespace should not be accessible and should throw an
* exception.
*/
final UserGroupInformation user = UserGroupInformation.
createUserForTesting("user", new String[] {"mygroup"});
fs.setXAttr(path, "trusted.foo", "1234".getBytes());
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
userFs.removeXAttr(path, "trusted.foo");
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("User doesn't have permission", e);
} finally {
fs.removeXAttr(path, "trusted.foo");
}
/*
* Test that an exception is thrown if the caller doesn't have permission to
* get the value of the xattr.
*/
/* Set access so that only the owner has access. */
fs.setPermission(path, new FsPermission((short) 0700));
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
userFs.removeXAttr(path, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/*
* The caller must have "execute" (scan) access to the parent directory.
*/
final Path childDir = new Path(path, "child" + pathCount);
/* Set access to parent so that only the owner has access. */
FileSystem.mkdirs(fs, childDir, FsPermission.createImmutable((short)0700));
fs.setXAttr(childDir, name1, "1234".getBytes());
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
userFs.removeXAttr(childDir, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/* Check that read access to the owning directory is not good enough. */
fs.setPermission(path, new FsPermission((short) 0704));
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
userFs.removeXAttr(childDir, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/*
* Check that execute access to the owning directory and scan access to
* the actual entity with extended attributes is not good enough.
*/
fs.setPermission(path, new FsPermission((short) 0701));
fs.setPermission(childDir, new FsPermission((short) 0701));
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
userFs.removeXAttr(childDir, name1);
return null;
}
});
Assert.fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
/*
* Check that execute access to the owning directory and write access to
* the actual entity with extended attributes is good enough.
*/
fs.setPermission(path, new FsPermission((short) 0701));
fs.setPermission(childDir, new FsPermission((short) 0706));
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final FileSystem userFs = dfsCluster.getFileSystem();
userFs.removeXAttr(childDir, name1);
return null;
}
});
}
@Test(timeout = 120000)
public void testRenameFileWithXAttr() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));

View File

@ -416,7 +416,7 @@ public class TestNamenodeRetryCache {
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
assertEquals(22, cacheSet.size());
assertEquals(23, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -435,7 +435,7 @@ public class TestNamenodeRetryCache {
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
assertEquals(22, cacheSet.size());
assertEquals(23, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();

View File

@ -160,7 +160,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
assertEquals(22, cacheSet.size());
assertEquals(23, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -181,7 +181,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals(22, cacheSet.size());
assertEquals(23, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
@ -1047,6 +1047,49 @@ public class TestRetryCacheWithHA {
}
}
/** removeXAttr */
class RemoveXAttrOp extends AtMostOnceOp {
private final String src;
RemoveXAttrOp(DFSClient client, String src) {
super("removeXAttr", client);
this.src = src;
}
@Override
void prepare() throws Exception {
Path p = new Path(src);
if (!dfs.exists(p)) {
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
client.setXAttr(src, "user.key", "value".getBytes(),
EnumSet.of(XAttrSetFlag.CREATE));
}
}
@Override
void invoke() throws Exception {
client.removeXAttr(src, "user.key");
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
Map<String, byte[]> iter = dfs.getXAttrs(new Path(src));
Set<String> keySet = iter.keySet();
if (!keySet.contains("user.key")) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
@Test (timeout=60000)
public void testCreateSnapshot() throws Exception {
final DFSClient client = genClientWithDummyHandler();
@ -1183,6 +1226,13 @@ public class TestRetryCacheWithHA {
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRemoveXAttr() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemoveXAttrOp(client, "/removexattr");
testClientRetryWithFailover(op);
}
/**
* When NN failover happens, if the client did not receive the response and
* send a retry request to the other NN, the same response should be recieved

View File

@ -39,14 +39,18 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
/** Test WebHDFS */
public class TestWebHDFS {
@ -445,4 +449,37 @@ public class TestWebHDFS {
}
}
}
/**
* Make sure a RetriableException is thrown when rpcServer is null in
* NamenodeWebHdfsMethods.
*/
@Test
public void testRaceWhileNNStartup() throws Exception {
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
final NameNode namenode = cluster.getNameNode();
final NamenodeProtocols rpcServer = namenode.getRpcServer();
Whitebox.setInternalState(namenode, "rpcServer", null);
final Path foo = new Path("/foo");
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsFileSystem.SCHEME);
try {
webHdfs.mkdirs(foo);
fail("Expected RetriableException");
} catch (RetriableException e) {
GenericTestUtils.assertExceptionContains("Namenode is in startup mode",
e);
}
Whitebox.setInternalState(namenode, "rpcServer", rpcServer);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -18,6 +18,15 @@
package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -29,18 +38,14 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import org.mockito.internal.util.reflection.Whitebox;
public class TestWebHDFSForHA {
private static final String LOGICAL_NAME = "minidfs";
@ -182,4 +187,61 @@ public class TestWebHDFSForHA {
}
}
}
/**
* Make sure the WebHdfsFileSystem will retry based on RetriableException when
* rpcServer is null in NamenodeWebHdfsMethods while NameNode starts up.
*/
@Test (timeout=120000)
public void testRetryWhileNNStartup() throws Exception {
final Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
MiniDFSCluster cluster = null;
final Map<String, Boolean> resultMap = new HashMap<String, Boolean>();
try {
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
.numDataNodes(0).build();
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
cluster.waitActive();
cluster.transitionToActive(0);
final NameNode namenode = cluster.getNameNode(0);
final NamenodeProtocols rpcServer = namenode.getRpcServer();
Whitebox.setInternalState(namenode, "rpcServer", null);
new Thread() {
@Override
public void run() {
boolean result = false;
FileSystem fs = null;
try {
fs = FileSystem.get(WEBHDFS_URI, conf);
final Path dir = new Path("/test");
result = fs.mkdirs(dir);
} catch (IOException e) {
result = false;
} finally {
IOUtils.cleanup(null, fs);
}
synchronized (TestWebHDFSForHA.this) {
resultMap.put("mkdirs", result);
TestWebHDFSForHA.this.notifyAll();
}
}
}.start();
Thread.sleep(1000);
Whitebox.setInternalState(namenode, "rpcServer", rpcServer);
synchronized (this) {
while (!resultMap.containsKey("mkdirs")) {
this.wait();
}
Assert.assertTrue(resultMap.get("mkdirs"));
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -355,12 +355,6 @@ public class TestParam {
public void testXAttrNameParam() {
final XAttrNameParam p = new XAttrNameParam("user.a1");
Assert.assertEquals(p.getXAttrName(), "user.a1");
try {
new XAttrNameParam("a1");
Assert.fail();
} catch (IllegalArgumentException e) {
LOG.info("EXPECTED: " + e);
}
}
@Test

View File

@ -986,6 +986,8 @@
<NAMESPACE>USER</NAMESPACE>
<NAME>a2</NAME>
</XATTR>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>82</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>

View File

@ -62,6 +62,9 @@ Release 2.6.0 - UNRELEASED
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
(Li Lu via jianhe)
YARN-1342. Recover container tokens upon nodemanager restart. (Jason Lowe via
devaraj)
OPTIMIZATIONS
BUG FIXES
@ -94,6 +97,9 @@ Release 2.6.0 - UNRELEASED
YARN-2313. Livelock can occur in FairScheduler when there are lots of
running apps (Tsuyoshi Ozawa via Sandy Ryza)
YARN-2147. client lacks delegation token exception details when
application submit fails (Chen He via jlowe)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -300,6 +306,9 @@ Release 2.5.0 - UNRELEASED
YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and
caused a task timeout for 30mins. (Sunil G via mayank)
YARN-2300. Improved the documentation of the sample requests for RM REST API -
submitting an app. (Varun Vasudev via zjshen)
OPTIMIZATIONS
BUG FIXES

View File

@ -43,7 +43,7 @@ public class BaseContainerTokenSecretManager extends
private static Log LOG = LogFactory
.getLog(BaseContainerTokenSecretManager.class);
private int serialNo = new SecureRandom().nextInt();
protected int serialNo = new SecureRandom().nextInt();
protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected final Lock readLock = readWriteLock.readLock();

View File

@ -173,8 +173,8 @@ public class NodeManager extends CompositeService
NMContainerTokenSecretManager containerTokenSecretManager)
throws IOException {
if (nmStore.canRecover()) {
nmTokenSecretManager.recover(nmStore.loadNMTokenState());
// TODO: recover containerTokenSecretManager
nmTokenSecretManager.recover();
containerTokenSecretManager.recover();
}
}
@ -190,7 +190,7 @@ public class NodeManager extends CompositeService
initAndStartRecoveryStore(conf);
NMContainerTokenSecretManager containerTokenSecretManager =
new NMContainerTokenSecretManager(conf);
new NMContainerTokenSecretManager(conf, nmStore);
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM(nmStore);

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
@ -90,6 +91,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
private static final String NM_TOKENS_PREV_MASTER_KEY =
NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
private static final String CONTAINER_TOKENS_KEY_PREFIX =
"ContainerTokens/";
private static final String CONTAINER_TOKENS_CURRENT_MASTER_KEY =
CONTAINER_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
private DB db;
@ -141,7 +148,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
key.substring(0, userEndPos+1)));
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
@ -260,7 +267,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), proto.toByteArray());
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}
@ -283,7 +290,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}
@ -306,7 +313,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}
@ -355,7 +362,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
@ -371,7 +378,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), taskProto.toByteArray());
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}
@ -381,14 +388,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}
@Override
public RecoveredNMTokenState loadNMTokenState() throws IOException {
RecoveredNMTokenState state = new RecoveredNMTokenState();
public RecoveredNMTokensState loadNMTokensState() throws IOException {
RecoveredNMTokensState state = new RecoveredNMTokensState();
state.applicationMasterKeys =
new HashMap<ApplicationAttemptId, MasterKey>();
LeveldbIterator iter = null;
@ -420,7 +427,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
@ -454,7 +461,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}
@ -468,7 +475,91 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(dbKey), pb.getProto().toByteArray());
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}
@Override
public RecoveredContainerTokensState loadContainerTokensState()
throws IOException {
RecoveredContainerTokensState state = new RecoveredContainerTokensState();
state.activeTokens = new HashMap<ContainerId, Long>();
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
final int containerTokensKeyPrefixLength =
CONTAINER_TOKENS_KEY_PREFIX.length();
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
String fullKey = asString(entry.getKey());
if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
break;
}
String key = fullKey.substring(containerTokensKeyPrefixLength);
if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
state.currentMasterKey = parseMasterKey(entry.getValue());
} else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
state.previousMasterKey = parseMasterKey(entry.getValue());
} else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
loadContainerToken(state, fullKey, key, entry.getValue());
}
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
return state;
}
private static void loadContainerToken(RecoveredContainerTokensState state,
String key, String containerIdStr, byte[] value) throws IOException {
ContainerId containerId;
Long expTime;
try {
containerId = ConverterUtils.toContainerId(containerIdStr);
expTime = Long.parseLong(asString(value));
} catch (IllegalArgumentException e) {
throw new IOException("Bad container token state for " + key, e);
}
state.activeTokens.put(containerId, expTime);
}
@Override
public void storeContainerTokenCurrentMasterKey(MasterKey key)
throws IOException {
storeMasterKey(CONTAINER_TOKENS_CURRENT_MASTER_KEY, key);
}
@Override
public void storeContainerTokenPreviousMasterKey(MasterKey key)
throws IOException {
storeMasterKey(CONTAINER_TOKENS_PREV_MASTER_KEY, key);
}
@Override
public void storeContainerToken(ContainerId containerId, Long expTime)
throws IOException {
String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
try {
db.put(bytes(key), bytes(expTime.toString()));
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeContainerToken(ContainerId containerId)
throws IOException {
String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
try {
db.delete(bytes(key));
} catch (DBException e) {
throw new IOException(e);
}
}
@ -554,7 +645,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
throw new IOException(e);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@ -80,7 +81,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
public RecoveredNMTokenState loadNMTokenState() throws IOException {
public RecoveredNMTokensState loadNMTokensState() throws IOException {
throw new UnsupportedOperationException(
"Recovery not supported by this state store");
}
@ -105,6 +106,33 @@ public class NMNullStateStoreService extends NMStateStoreService {
throws IOException {
}
@Override
public RecoveredContainerTokensState loadContainerTokensState()
throws IOException {
throw new UnsupportedOperationException(
"Recovery not supported by this state store");
}
@Override
public void storeContainerTokenCurrentMasterKey(MasterKey key)
throws IOException {
}
@Override
public void storeContainerTokenPreviousMasterKey(MasterKey key)
throws IOException {
}
@Override
public void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException {
}
@Override
public void removeContainerToken(ContainerId containerId)
throws IOException {
}
@Override
protected void initStorage(Configuration conf) throws IOException {
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@ -102,7 +103,7 @@ public abstract class NMStateStoreService extends AbstractService {
}
}
public static class RecoveredNMTokenState {
public static class RecoveredNMTokensState {
MasterKey currentMasterKey;
MasterKey previousMasterKey;
Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
@ -120,6 +121,24 @@ public abstract class NMStateStoreService extends AbstractService {
}
}
public static class RecoveredContainerTokensState {
MasterKey currentMasterKey;
MasterKey previousMasterKey;
Map<ContainerId, Long> activeTokens;
public MasterKey getCurrentMasterKey() {
return currentMasterKey;
}
public MasterKey getPreviousMasterKey() {
return previousMasterKey;
}
public Map<ContainerId, Long> getActiveTokens() {
return activeTokens;
}
}
/** Initialize the state storage */
@Override
public void serviceInit(Configuration conf) throws IOException {
@ -193,7 +212,8 @@ public abstract class NMStateStoreService extends AbstractService {
public abstract void removeDeletionTask(int taskId) throws IOException;
public abstract RecoveredNMTokenState loadNMTokenState() throws IOException;
public abstract RecoveredNMTokensState loadNMTokensState()
throws IOException;
public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
throws IOException;
@ -208,6 +228,22 @@ public abstract class NMStateStoreService extends AbstractService {
ApplicationAttemptId attempt) throws IOException;
public abstract RecoveredContainerTokensState loadContainerTokensState()
throws IOException;
public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
throws IOException;
public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
throws IOException;
public abstract void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException;
public abstract void removeContainerToken(ContainerId containerId)
throws IOException;
protected abstract void initStorage(Configuration conf) throws IOException;
protected abstract void startStorage() throws IOException;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.security;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -33,6 +34,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@ -49,14 +53,74 @@ public class NMContainerTokenSecretManager extends
private MasterKeyData previousMasterKey;
private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
private final NMStateStoreService stateStore;
private String nodeHostAddr;
public NMContainerTokenSecretManager(Configuration conf) {
this(conf, new NMNullStateStoreService());
}
public NMContainerTokenSecretManager(Configuration conf,
NMStateStoreService stateStore) {
super(conf);
recentlyStartedContainerTracker =
new TreeMap<Long, List<ContainerId>>();
this.stateStore = stateStore;
}
public synchronized void recover()
throws IOException {
RecoveredContainerTokensState state =
stateStore.loadContainerTokensState();
MasterKey key = state.getCurrentMasterKey();
if (key != null) {
super.currentMasterKey =
new MasterKeyData(key, createSecretKey(key.getBytes().array()));
}
key = state.getPreviousMasterKey();
if (key != null) {
previousMasterKey =
new MasterKeyData(key, createSecretKey(key.getBytes().array()));
}
// restore the serial number from the current master key
if (super.currentMasterKey != null) {
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
}
for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
ContainerId containerId = entry.getKey();
Long expTime = entry.getValue();
List<ContainerId> containerList =
recentlyStartedContainerTracker.get(expTime);
if (containerList == null) {
containerList = new ArrayList<ContainerId>();
recentlyStartedContainerTracker.put(expTime, containerList);
}
if (!containerList.contains(containerId)) {
containerList.add(containerId);
}
}
}
private void updateCurrentMasterKey(MasterKeyData key) {
super.currentMasterKey = key;
try {
stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey());
} catch (IOException e) {
LOG.error("Unable to update current master key in state store", e);
}
}
private void updatePreviousMasterKey(MasterKeyData key) {
previousMasterKey = key;
try {
stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey());
} catch (IOException e) {
LOG.error("Unable to update previous master key in state store", e);
}
}
/**
@ -68,21 +132,16 @@ public class NMContainerTokenSecretManager extends
*/
@Private
public synchronized void setMasterKey(MasterKey masterKeyRecord) {
LOG.info("Rolling master-key for container-tokens, got key with id "
+ masterKeyRecord.getKeyId());
if (super.currentMasterKey == null) {
super.currentMasterKey =
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
.getBytes().array()));
} else {
if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
.getKeyId()) {
// Update keys only if the key has changed.
this.previousMasterKey = super.currentMasterKey;
super.currentMasterKey =
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
.getBytes().array()));
// Update keys only if the key has changed.
if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
.getKeyId() != masterKeyRecord.getKeyId()) {
LOG.info("Rolling master-key for container-tokens, got key with id "
+ masterKeyRecord.getKeyId());
if (super.currentMasterKey != null) {
updatePreviousMasterKey(super.currentMasterKey);
}
updateCurrentMasterKey(new MasterKeyData(masterKeyRecord,
createSecretKey(masterKeyRecord.getBytes().array())));
}
}
@ -137,14 +196,19 @@ public class NMContainerTokenSecretManager extends
removeAnyContainerTokenIfExpired();
ContainerId containerId = tokenId.getContainerID();
Long expTime = tokenId.getExpiryTimeStamp();
// We might have multiple containers with same expiration time.
if (!recentlyStartedContainerTracker.containsKey(expTime)) {
recentlyStartedContainerTracker
.put(expTime, new ArrayList<ContainerId>());
}
recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
recentlyStartedContainerTracker.get(expTime).add(containerId);
try {
stateStore.storeContainerToken(containerId, expTime);
} catch (IOException e) {
LOG.error("Unable to store token for container " + containerId, e);
}
}
protected synchronized void removeAnyContainerTokenIfExpired() {
@ -155,6 +219,13 @@ public class NMContainerTokenSecretManager extends
while (containersI.hasNext()) {
Entry<Long, List<ContainerId>> containerEntry = containersI.next();
if (containerEntry.getKey() < currTime) {
for (ContainerId container : containerEntry.getValue()) {
try {
stateStore.removeContainerToken(container);
} catch (IOException e) {
LOG.error("Unable to remove token for container " + container, e);
}
}
containersI.remove();
} else {
break;

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@ -64,8 +64,9 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
this.stateStore = stateStore;
}
public synchronized void recover(RecoveredNMTokenState state)
public synchronized void recover()
throws IOException {
RecoveredNMTokensState state = stateStore.loadNMTokensState();
MasterKey key = state.getCurrentMasterKey();
if (key != null) {
super.currentMasterKey =

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@ -36,7 +37,8 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<TrackerKey, TrackerState> trackerStates;
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
private RecoveredNMTokenState nmTokenState;
private RecoveredNMTokensState nmTokenState;
private RecoveredContainerTokensState containerTokenState;
public NMMemoryStateStoreService() {
super(NMMemoryStateStoreService.class.getName());
@ -117,12 +119,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
protected void initStorage(Configuration conf) {
nmTokenState = new RecoveredNMTokenState();
nmTokenState = new RecoveredNMTokensState();
nmTokenState.applicationMasterKeys =
new HashMap<ApplicationAttemptId, MasterKey>();
containerTokenState = new RecoveredContainerTokensState();
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
trackerStates = new HashMap<TrackerKey, TrackerState>();
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
}
@Override
@ -157,9 +160,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
public RecoveredNMTokenState loadNMTokenState() throws IOException {
public RecoveredNMTokensState loadNMTokensState() throws IOException {
// return a copy so caller can't modify our state
RecoveredNMTokenState result = new RecoveredNMTokenState();
RecoveredNMTokensState result = new RecoveredNMTokensState();
result.currentMasterKey = nmTokenState.currentMasterKey;
result.previousMasterKey = nmTokenState.previousMasterKey;
result.applicationMasterKeys =
@ -197,6 +200,48 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
public RecoveredContainerTokensState loadContainerTokensState()
throws IOException {
// return a copy so caller can't modify our state
RecoveredContainerTokensState result =
new RecoveredContainerTokensState();
result.currentMasterKey = containerTokenState.currentMasterKey;
result.previousMasterKey = containerTokenState.previousMasterKey;
result.activeTokens =
new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
return result;
}
@Override
public void storeContainerTokenCurrentMasterKey(MasterKey key)
throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
containerTokenState.currentMasterKey =
new MasterKeyPBImpl(keypb.getProto());
}
@Override
public void storeContainerTokenPreviousMasterKey(MasterKey key)
throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
containerTokenState.previousMasterKey =
new MasterKeyPBImpl(keypb.getProto());
}
@Override
public void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException {
containerTokenState.activeTokens.put(containerId, expirationTime);
}
@Override
public void removeContainerToken(ContainerId containerId)
throws IOException {
containerTokenState.activeTokens.remove(containerId);
}
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();

View File

@ -27,11 +27,13 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -42,12 +44,15 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
@ -502,7 +507,7 @@ public class TestNMLeveldbStateStoreService {
@Test
public void testNMTokenStorage() throws IOException {
// test empty when no state
RecoveredNMTokenState state = stateStore.loadNMTokenState();
RecoveredNMTokensState state = stateStore.loadNMTokensState();
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
@ -512,7 +517,7 @@ public class TestNMLeveldbStateStoreService {
MasterKey currentKey = secretMgr.generateKey();
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokenState();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
@ -521,7 +526,7 @@ public class TestNMLeveldbStateStoreService {
MasterKey prevKey = secretMgr.generateKey();
stateStore.storeNMTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadNMTokenState();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
@ -536,7 +541,7 @@ public class TestNMLeveldbStateStoreService {
MasterKey attemptKey2 = secretMgr.generateKey();
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
restartStateStore();
state = stateStore.loadNMTokenState();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
@ -558,7 +563,7 @@ public class TestNMLeveldbStateStoreService {
currentKey = secretMgr.generateKey();
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokenState();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
loadedAppKeys = state.getApplicationMasterKeys();
@ -568,10 +573,89 @@ public class TestNMLeveldbStateStoreService {
assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
}
@Test
public void testContainerTokenStorage() throws IOException {
// test empty when no state
RecoveredContainerTokensState state =
stateStore.loadContainerTokensState();
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
// store a master key and verify recovered
ContainerTokenKeyGeneratorForTest keygen =
new ContainerTokenKeyGeneratorForTest(new YarnConfiguration());
MasterKey currentKey = keygen.generateKey();
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = keygen.generateKey();
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
// store a few container tokens and verify recovered
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
Long expTime1 = 1234567890L;
ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
Long expTime2 = 9876543210L;
stateStore.storeContainerToken(cid1, expTime1);
stateStore.storeContainerToken(cid2, expTime2);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
Map<ContainerId, Long> loadedActiveTokens =
state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertEquals(expTime1, loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
// add/update/remove tokens and verify recovered
ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3);
Long expTime3 = 135798642L;
stateStore.storeContainerToken(cid3, expTime3);
stateStore.removeContainerToken(cid1);
expTime2 += 246897531L;
stateStore.storeContainerToken(cid2, expTime2);
prevKey = currentKey;
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
currentKey = keygen.generateKey();
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
loadedActiveTokens = state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertNull(loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
assertEquals(expTime3, loadedActiveTokens.get(cid3));
}
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {
return createNewMasterKey().getMasterKey();
}
}
private static class ContainerTokenKeyGeneratorForTest extends
BaseContainerTokenSecretManager {
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
super(conf);
}
public MasterKey generateKey() {
return createNewMasterKey().getMasterKey();
}
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.security;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Test;
public class TestNMContainerTokenSecretManager {
@Test
public void testRecovery() throws IOException {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
final NodeId nodeId = NodeId.newInstance("somehost", 1234);
final ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
final ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
ContainerTokenKeyGeneratorForTest keygen =
new ContainerTokenKeyGeneratorForTest(conf);
NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
NMContainerTokenSecretManager secretMgr =
new NMContainerTokenSecretManager(conf, stateStore);
secretMgr.setNodeId(nodeId);
MasterKey currentKey = keygen.generateKey();
secretMgr.setMasterKey(currentKey);
ContainerTokenIdentifier tokenId1 =
createContainerTokenId(cid1, nodeId, "user1", secretMgr);
ContainerTokenIdentifier tokenId2 =
createContainerTokenId(cid2, nodeId, "user2", secretMgr);
assertNotNull(secretMgr.retrievePassword(tokenId1));
assertNotNull(secretMgr.retrievePassword(tokenId2));
// restart and verify tokens still valid
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
secretMgr.setNodeId(nodeId);
secretMgr.recover();
assertEquals(currentKey, secretMgr.getCurrentKey());
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
assertTrue(secretMgr.isValidStartContainerRequest(tokenId2));
assertNotNull(secretMgr.retrievePassword(tokenId1));
assertNotNull(secretMgr.retrievePassword(tokenId2));
// roll master key and start a container
secretMgr.startContainerSuccessful(tokenId2);
currentKey = keygen.generateKey();
secretMgr.setMasterKey(currentKey);
// restart and verify tokens still valid due to prev key persist
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
secretMgr.setNodeId(nodeId);
secretMgr.recover();
assertEquals(currentKey, secretMgr.getCurrentKey());
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
assertFalse(secretMgr.isValidStartContainerRequest(tokenId2));
assertNotNull(secretMgr.retrievePassword(tokenId1));
assertNotNull(secretMgr.retrievePassword(tokenId2));
// roll master key again, restart, and verify keys no longer valid
currentKey = keygen.generateKey();
secretMgr.setMasterKey(currentKey);
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
secretMgr.setNodeId(nodeId);
secretMgr.recover();
assertEquals(currentKey, secretMgr.getCurrentKey());
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
assertFalse(secretMgr.isValidStartContainerRequest(tokenId2));
try {
secretMgr.retrievePassword(tokenId1);
fail("token should not be valid");
} catch (InvalidToken e) {
// expected
}
try {
secretMgr.retrievePassword(tokenId2);
fail("token should not be valid");
} catch (InvalidToken e) {
// expected
}
stateStore.close();
}
private static ContainerTokenIdentifier createContainerTokenId(
ContainerId cid, NodeId nodeId, String user,
NMContainerTokenSecretManager secretMgr) throws IOException {
long rmid = cid.getApplicationAttemptId().getApplicationId()
.getClusterTimestamp();
ContainerTokenIdentifier ctid = new ContainerTokenIdentifier(cid,
nodeId.toString(), user, BuilderUtils.newResource(1024, 1),
System.currentTimeMillis() + 100000L,
secretMgr.getCurrentKey().getKeyId(), rmid,
Priority.newInstance(0), 0);
Token token = BuilderUtils.newContainerToken(nodeId,
secretMgr.createPassword(ctid), ctid);
return BuilderUtils.newContainerTokenIdentifier(token);
}
private static class ContainerTokenKeyGeneratorForTest extends
BaseContainerTokenSecretManager {
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
super(conf);
}
public MasterKey generateKey() {
return createNewMasterKey().getMasterKey();
}
}
}

View File

@ -73,7 +73,7 @@ public class TestNMTokenSecretManagerInNM {
// restart and verify key is still there and token still valid
secretMgr = new NMTokenSecretManagerInNM(stateStore);
secretMgr.recover(stateStore.loadNMTokenState());
secretMgr.recover();
secretMgr.setNodeId(nodeId);
assertEquals(currentKey, secretMgr.getCurrentKey());
assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
@ -88,7 +88,7 @@ public class TestNMTokenSecretManagerInNM {
// restart and verify attempt1 key is still valid due to prev key persist
secretMgr = new NMTokenSecretManagerInNM(stateStore);
secretMgr.recover(stateStore.loadNMTokenState());
secretMgr.recover();
secretMgr.setNodeId(nodeId);
assertEquals(currentKey, secretMgr.getCurrentKey());
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
@ -101,7 +101,7 @@ public class TestNMTokenSecretManagerInNM {
currentKey = keygen.generateKey();
secretMgr.setMasterKey(currentKey);
secretMgr = new NMTokenSecretManagerInNM(stateStore);
secretMgr.recover(stateStore.loadNMTokenState());
secretMgr.recover();
secretMgr.setNodeId(nodeId);
assertEquals(currentKey, secretMgr.getCurrentKey());
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
@ -117,7 +117,7 @@ public class TestNMTokenSecretManagerInNM {
// remove last attempt, restart, verify both tokens are now bad
secretMgr.appFinished(attempt2.getApplicationId());
secretMgr = new NMTokenSecretManagerInNM(stateStore);
secretMgr.recover(stateStore.loadNMTokenState());
secretMgr.recover();
secretMgr.setNodeId(nodeId);
assertEquals(currentKey, secretMgr.getCurrentKey());
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));

View File

@ -388,7 +388,11 @@ public class DelegationTokenRenewer extends AbstractService {
// If user provides incorrect token then it should not be added for
// renewal.
for (DelegationTokenToRenew dtr : tokenList) {
renewToken(dtr);
try {
renewToken(dtr);
} catch (IOException ioe) {
throw new IOException("Failed to renew token: " + dtr.token, ioe);
}
}
for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -673,7 +674,40 @@ public class TestDelegationTokenRenewer {
Thread.sleep(200);
}
}
@Test(timeout=20000)
public void testDTRonAppSubmission()
throws IOException, InterruptedException, BrokenBarrierException {
final Credentials credsx = new Credentials();
final Token<?> tokenx = mock(Token.class);
credsx.addToken(new Text("token"), tokenx);
doReturn(true).when(tokenx).isManaged();
doThrow(new IOException("boom"))
.when(tokenx).renew(any(Configuration.class));
// fire up the renewer
final DelegationTokenRenewer dtr =
createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
dtr.setRMContext(mockContext);
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
dtr.init(conf);
dtr.start();
try {
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
fail("Catch IOException on app submission");
} catch (IOException e){
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
Assert.assertTrue(e.getCause().toString().contains("boom"));
}
}
@Test(timeout=20000)
public void testConcurrentAddApplication()
throws IOException, InterruptedException, BrokenBarrierException {

View File

@ -2228,68 +2228,48 @@ _01_000001</amContainerLogs>
{
"application-id":"application_1404203615263_0001",
"application-name":"test",
"queue":"testqueue",
"priority":"3",
"am-container-spec":
{
"local-resources":
{
"entry":
{
"key":"example",
"value":
[
{
"resource":"http://www.test.com/file.txt",
"type":"FILE",
"visibility":"APPLICATION",
"size":"100",
"timestamp":"1404203616003"
"key":"AppMaster.jar",
"value":
{
"resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
"type":"FILE",
"visibility":"APPLICATION",
"size": "43004",
"timestamp": "1405452071209"
}
}
}
]
},
"commands":
{
"command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
},
"environment":
{
"entry":
{
"key":"APP_VAR",
"value":"ENV_SETTING"
}
},
"commands":
{
"command":"/bin/sleep 5"
},
"service-data":
{
"entry":
{
"key":"test",
"value":"dmFsdWUxMg"
}
},
"credentials":
{
"tokens":null,
"secrets":
{
"entry":
{
"key":"secret1",
"value":"c2VjcmV0MQ"
}
}
},
"application-acls":
{
"entry":
[
{
"key":"VIEW_APP",
"value":"testuser3, testuser4"
"key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
"value": "1405459400754"
},
{
"key":"MODIFY_APP",
"value":"testuser1, testuser2"
"key": "CLASSPATH",
"value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLEN",
"value": "6"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
"value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
}
]
}
@ -2302,16 +2282,9 @@ _01_000001</amContainerLogs>
"vCores":"1"
},
"application-type":"YARN",
"keep-containers-across-application-attempts":"false",
"application-tags":
{
"tag":
[
"tag 2",
"tag1"
]
}
"keep-containers-across-application-attempts":"false"
}
+---+
Response Header:
@ -2349,22 +2322,34 @@ _01_000001</amContainerLogs>
<entry>
<key>example</key>
<value>
<resource>http://www.test.com/file.txt</resource>
<resource>hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar</resource>
<type>FILE</type>
<visibility>APPLICATION</visibility>
<size>100</size>
<timestamp>1404204892877</timestamp>
<size>43004</size>
<timestamp>1405452071209</timestamp>
</value>
</entry>
</local-resources>
<environment>
<entry>
<key>APP_VAR</key>
<value>ENV_SETTING</value>
<key>DISTRIBUTEDSHELLSCRIPTTIMESTAMP</key>
<value>1405459400754</value>
</entry>
<entry>
<key>CLASSPATH</key>
<value>{{CLASSPATH}}&lt;CPS&gt;./*&lt;CPS&gt;{{HADOOP_CONF_DIR}}&lt;CPS&gt;{{HADOOP_COMMON_HOME}}/share/hadoop/common/*&lt;CPS&gt;{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*&lt;CPS&gt;{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*&lt;CPS&gt;{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*&lt;CPS&gt;{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*&lt;CPS&gt;{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*&lt;CPS&gt;./log4j.properties</value>
</entry>
<entry>
<key>DISTRIBUTEDSHELLSCRIPTLEN</key>
<value>6</value>
</entry>
<entry>
<key>DISTRIBUTEDSHELLSCRIPTLOCATION</key>
<value>hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands</value>
</entry>
</environment>
<commands>
<command>/bin/sleep 5</command>
<command>{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1&gt;&lt;LOG_DIR&gt;/AppMaster.stdout 2&gt;&lt;LOG_DIR&gt;/AppMaster.stderr</command>
</commands>
<service-data>
<entry>