HDFS-6605.Client server negotiation of cipher suite. (wang)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1607499 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
31617733ac
commit
51b97a1396
|
@ -25,16 +25,19 @@ import com.google.common.base.Preconditions;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class AESCTRCryptoCodec extends CryptoCodec {
|
||||
|
||||
protected static final CipherSuite SUITE = CipherSuite.AES_CTR_NOPADDING;
|
||||
|
||||
/**
|
||||
* For AES, the algorithm block is fixed size of 128 bits.
|
||||
* @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard
|
||||
*/
|
||||
private static final int AES_BLOCK_SIZE = 16;
|
||||
private static final int AES_BLOCK_SIZE = SUITE.getAlgorithmBlockSize();
|
||||
private static final int CTR_OFFSET = 8;
|
||||
|
||||
@Override
|
||||
public int getAlgorithmBlockSize() {
|
||||
return AES_BLOCK_SIZE;
|
||||
public CipherSuite getCipherSuite() {
|
||||
return SUITE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.crypto;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Defines properties of a CipherSuite. Modeled after the ciphers in
|
||||
|
@ -27,14 +26,25 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public enum CipherSuite {
|
||||
AES_CTR_NOPADDING("AES/CTR/NoPadding", 128);
|
||||
UNKNOWN("Unknown", 0),
|
||||
AES_CTR_NOPADDING("AES/CTR/NoPadding", 16);
|
||||
|
||||
private final String name;
|
||||
private final int blockBits;
|
||||
private final int algoBlockSize;
|
||||
|
||||
CipherSuite(String name, int blockBits) {
|
||||
private Integer unknownValue = null;
|
||||
|
||||
CipherSuite(String name, int algoBlockSize) {
|
||||
this.name = name;
|
||||
this.blockBits = blockBits;
|
||||
this.algoBlockSize = algoBlockSize;
|
||||
}
|
||||
|
||||
public void setUnknownValue(int unknown) {
|
||||
this.unknownValue = unknown;
|
||||
}
|
||||
|
||||
public int getUnknownValue() {
|
||||
return unknownValue;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -45,17 +55,20 @@ public enum CipherSuite {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return size of an algorithm block in bits
|
||||
* @return size of an algorithm block in bytes
|
||||
*/
|
||||
public int getNumberBlockBits() {
|
||||
return blockBits;
|
||||
public int getAlgorithmBlockSize() {
|
||||
return algoBlockSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder("{");
|
||||
builder.append("name: " + getName() + ", ");
|
||||
builder.append("numBlockBits: " + getNumberBlockBits());
|
||||
builder.append("name: " + name);
|
||||
builder.append(", algorithmBlockSize: " + algoBlockSize);
|
||||
if (unknownValue != null) {
|
||||
builder.append(", unknownValue: " + unknownValue);
|
||||
}
|
||||
builder.append("}");
|
||||
return builder.toString();
|
||||
}
|
||||
|
|
|
@ -39,13 +39,11 @@ public abstract class CryptoCodec implements Configurable {
|
|||
CryptoCodec.class);
|
||||
return ReflectionUtils.newInstance(klass, conf);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the block size of a block cipher.
|
||||
* For different algorithms, the block size may be different.
|
||||
* @return int the block size
|
||||
* @return the CipherSuite for this codec.
|
||||
*/
|
||||
public abstract int getAlgorithmBlockSize();
|
||||
public abstract CipherSuite getCipherSuite();
|
||||
|
||||
/**
|
||||
* Create a {@link org.apache.hadoop.crypto.Encryptor}.
|
||||
|
|
|
@ -265,11 +265,11 @@ public class CryptoInputStream extends FilterInputStream implements
|
|||
}
|
||||
|
||||
private long getCounter(long position) {
|
||||
return position / codec.getAlgorithmBlockSize();
|
||||
return position / codec.getCipherSuite().getAlgorithmBlockSize();
|
||||
}
|
||||
|
||||
private byte getPadding(long position) {
|
||||
return (byte)(position % codec.getAlgorithmBlockSize());
|
||||
return (byte)(position % codec.getCipherSuite().getAlgorithmBlockSize());
|
||||
}
|
||||
|
||||
/** Calculate the counter and iv, update the decryptor. */
|
||||
|
|
|
@ -194,8 +194,10 @@ public class CryptoOutputStream extends FilterOutputStream implements
|
|||
|
||||
/** Update the {@link #encryptor}: calculate counter and {@link #padding}. */
|
||||
private void updateEncryptor() throws IOException {
|
||||
final long counter = streamOffset / codec.getAlgorithmBlockSize();
|
||||
padding = (byte)(streamOffset % codec.getAlgorithmBlockSize());
|
||||
final long counter =
|
||||
streamOffset / codec.getCipherSuite().getAlgorithmBlockSize();
|
||||
padding =
|
||||
(byte)(streamOffset % codec.getCipherSuite().getAlgorithmBlockSize());
|
||||
inBuffer.position(padding); // Set proper position for input data.
|
||||
codec.calculateIV(initIV, counter, iv);
|
||||
encryptor.init(key, iv);
|
||||
|
|
|
@ -53,7 +53,8 @@ public class CryptoStreamUtils {
|
|||
public static int checkBufferSize(CryptoCodec codec, int bufferSize) {
|
||||
Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE,
|
||||
"Minimum value of buffer size is " + MIN_BUFFER_SIZE + ".");
|
||||
return bufferSize - bufferSize % codec.getAlgorithmBlockSize();
|
||||
return bufferSize - bufferSize % codec.getCipherSuite()
|
||||
.getAlgorithmBlockSize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -92,9 +92,9 @@ public class JCEAESCTRCryptoCodec extends AESCTRCryptoCodec {
|
|||
throws GeneralSecurityException {
|
||||
this.mode = mode;
|
||||
if (provider == null || provider.isEmpty()) {
|
||||
cipher = Cipher.getInstance("AES/CTR/NoPadding");
|
||||
cipher = Cipher.getInstance(SUITE.getName());
|
||||
} else {
|
||||
cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
|
||||
cipher = Cipher.getInstance(SUITE.getName(), provider);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.fs;
|
|||
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
@ -40,9 +39,9 @@ public class FileEncryptionInfo {
|
|||
checkNotNull(suite);
|
||||
checkNotNull(key);
|
||||
checkNotNull(iv);
|
||||
checkArgument(key.length == suite.getNumberBlockBits() / 8,
|
||||
checkArgument(key.length == suite.getAlgorithmBlockSize(),
|
||||
"Unexpected key length");
|
||||
checkArgument(iv.length == suite.getNumberBlockBits() / 8,
|
||||
checkArgument(iv.length == suite.getAlgorithmBlockSize(),
|
||||
"Unexpected IV length");
|
||||
this.cipherSuite = suite;
|
||||
this.key = key;
|
||||
|
|
|
@ -28,6 +28,8 @@ fs-encryption (Unreleased)
|
|||
|
||||
HDFS-6389. Rename restrictions for encryption zones. (clamb)
|
||||
|
||||
HDFS-6605. Client server negotiation of cipher suite. (wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -94,6 +94,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoCodec;
|
||||
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||
import org.apache.hadoop.crypto.CryptoOutputStream;
|
||||
|
@ -246,6 +247,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
new DFSHedgedReadMetrics();
|
||||
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
|
||||
private final CryptoCodec codec;
|
||||
@VisibleForTesting
|
||||
List<CipherSuite> cipherSuites;
|
||||
|
||||
/**
|
||||
* DFSClient configuration
|
||||
|
@ -579,6 +582,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
|
||||
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
|
||||
this.codec = CryptoCodec.getInstance(conf);
|
||||
this.cipherSuites = Lists.newArrayListWithCapacity(1);
|
||||
cipherSuites.add(codec.getCipherSuite());
|
||||
|
||||
int numResponseToDrop = conf.getInt(
|
||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
||||
|
@ -1523,7 +1528,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
}
|
||||
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
||||
src, masked, flag, createParent, replication, blockSize, progress,
|
||||
buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
|
||||
buffersize, dfsClientConf.createChecksum(checksumOpt),
|
||||
favoredNodeStrs, cipherSuites);
|
||||
beginFileLease(result.getFileId(), result);
|
||||
return result;
|
||||
}
|
||||
|
@ -1570,7 +1576,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
|
||||
result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
|
||||
flag, createParent, replication, blockSize, progress, buffersize,
|
||||
checksum);
|
||||
checksum, null, cipherSuites);
|
||||
}
|
||||
beginFileLease(result.getFileId(), result);
|
||||
return result;
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSOutputSummer;
|
||||
|
@ -1605,12 +1606,13 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, Progressable progress, int buffersize,
|
||||
DataChecksum checksum, String[] favoredNodes) throws IOException {
|
||||
DataChecksum checksum, String[] favoredNodes,
|
||||
List<CipherSuite> cipherSuites) throws IOException {
|
||||
final HdfsFileStatus stat;
|
||||
try {
|
||||
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
||||
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
|
||||
blockSize);
|
||||
blockSize, cipherSuites);
|
||||
} catch(RemoteException re) {
|
||||
throw re.unwrapRemoteException(AccessControlException.class,
|
||||
DSQuotaExceededException.class,
|
||||
|
@ -1620,7 +1622,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
NSQuotaExceededException.class,
|
||||
SafeModeException.class,
|
||||
UnresolvedPathException.class,
|
||||
SnapshotAccessControlException.class);
|
||||
SnapshotAccessControlException.class,
|
||||
UnknownCipherSuiteException.class);
|
||||
}
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
|
||||
flag, progress, checksum, favoredNodes);
|
||||
|
@ -1628,14 +1631,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
return out;
|
||||
}
|
||||
|
||||
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, Progressable progress, int buffersize,
|
||||
DataChecksum checksum) throws IOException {
|
||||
return newStreamForCreate(dfsClient, src, masked, flag, createParent, replication,
|
||||
blockSize, progress, buffersize, checksum, null);
|
||||
}
|
||||
|
||||
/** Construct a new output stream for append. */
|
||||
private DFSOutputStream(DFSClient dfsClient, String src,
|
||||
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class UnknownCipherSuiteException extends IOException {
|
||||
private static final long serialVersionUID = 8957192l;
|
||||
|
||||
public UnknownCipherSuiteException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public UnknownCipherSuiteException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
@ -186,7 +187,8 @@ public interface ClientProtocol {
|
|||
@AtMostOnce
|
||||
public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
boolean createParent, short replication, long blockSize,
|
||||
List<CipherSuite> cipherSuites)
|
||||
throws AccessControlException, AlreadyBeingCreatedException,
|
||||
DSQuotaExceededException, FileAlreadyExistsException,
|
||||
FileNotFoundException, NSQuotaExceededException,
|
||||
|
|
|
@ -375,7 +375,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
HdfsFileStatus result = server.create(req.getSrc(),
|
||||
PBHelper.convert(req.getMasked()), req.getClientName(),
|
||||
PBHelper.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
|
||||
(short) req.getReplication(), req.getBlockSize());
|
||||
(short) req.getReplication(), req.getBlockSize(),
|
||||
PBHelper.convertCipherSuiteProtos(req.getCipherSuitesList()));
|
||||
|
||||
if (result != null) {
|
||||
return CreateResponseProto.newBuilder().setFs(PBHelper.convert(result))
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
|
@ -249,21 +250,25 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
@Override
|
||||
public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
boolean createParent, short replication, long blockSize,
|
||||
List<CipherSuite> cipherSuites)
|
||||
throws AccessControlException, AlreadyBeingCreatedException,
|
||||
DSQuotaExceededException, FileAlreadyExistsException,
|
||||
FileNotFoundException, NSQuotaExceededException,
|
||||
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
|
||||
IOException {
|
||||
CreateRequestProto req = CreateRequestProto.newBuilder()
|
||||
CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
|
||||
.setSrc(src)
|
||||
.setMasked(PBHelper.convert(masked))
|
||||
.setClientName(clientName)
|
||||
.setCreateFlag(PBHelper.convertCreateFlag(flag))
|
||||
.setCreateParent(createParent)
|
||||
.setReplication(replication)
|
||||
.setBlockSize(blockSize)
|
||||
.build();
|
||||
.setBlockSize(blockSize);
|
||||
if (cipherSuites != null) {
|
||||
builder.addAllCipherSuites(PBHelper.convertCipherSuites(cipherSuites));
|
||||
}
|
||||
CreateRequestProto req = builder.build();
|
||||
try {
|
||||
CreateResponseProto res = rpcProxy.create(null, req);
|
||||
return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
|
||||
|
|
|
@ -2276,25 +2276,49 @@ public class PBHelper {
|
|||
return new ShmId(shmId.getHi(), shmId.getLo());
|
||||
}
|
||||
|
||||
public static HdfsProtos.FileEncryptionInfoProto.CipherType
|
||||
convert(CipherSuite type) {
|
||||
switch (type) {
|
||||
public static HdfsProtos.CipherSuite convert(CipherSuite suite) {
|
||||
switch (suite) {
|
||||
case UNKNOWN:
|
||||
return HdfsProtos.CipherSuite.UNKNOWN;
|
||||
case AES_CTR_NOPADDING:
|
||||
return HdfsProtos.FileEncryptionInfoProto.CipherType
|
||||
.AES_CTR_NOPADDING;
|
||||
return HdfsProtos.CipherSuite.AES_CTR_NOPADDING;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static CipherSuite convert(
|
||||
HdfsProtos.FileEncryptionInfoProto.CipherType proto) {
|
||||
public static CipherSuite convert(HdfsProtos.CipherSuite proto) {
|
||||
switch (proto) {
|
||||
case AES_CTR_NOPADDING:
|
||||
return CipherSuite.AES_CTR_NOPADDING;
|
||||
default:
|
||||
// Set to UNKNOWN and stash the unknown enum value
|
||||
CipherSuite suite = CipherSuite.UNKNOWN;
|
||||
suite.setUnknownValue(proto.getNumber());
|
||||
return suite;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<HdfsProtos.CipherSuite> convertCipherSuites
|
||||
(List<CipherSuite> suites) {
|
||||
if (suites == null) {
|
||||
return null;
|
||||
}
|
||||
List<HdfsProtos.CipherSuite> protos =
|
||||
Lists.newArrayListWithCapacity(suites.size());
|
||||
for (CipherSuite suite : suites) {
|
||||
protos.add(convert(suite));
|
||||
}
|
||||
return protos;
|
||||
}
|
||||
|
||||
public static List<CipherSuite> convertCipherSuiteProtos(
|
||||
List<HdfsProtos.CipherSuite> protos) {
|
||||
List<CipherSuite> suites = Lists.newArrayListWithCapacity(protos.size());
|
||||
for (HdfsProtos.CipherSuite proto : protos) {
|
||||
suites.add(convert(proto));
|
||||
}
|
||||
return suites;
|
||||
}
|
||||
|
||||
public static HdfsProtos.FileEncryptionInfoProto convert(
|
||||
|
@ -2303,7 +2327,7 @@ public class PBHelper {
|
|||
return null;
|
||||
}
|
||||
return HdfsProtos.FileEncryptionInfoProto.newBuilder()
|
||||
.setType(convert(info.getCipherSuite()))
|
||||
.setSuite(convert(info.getCipherSuite()))
|
||||
.setKey(getByteString(info.getEncryptedDataEncryptionKey()))
|
||||
.setIv(getByteString(info.getIV()))
|
||||
.build();
|
||||
|
@ -2314,10 +2338,10 @@ public class PBHelper {
|
|||
if (proto == null) {
|
||||
return null;
|
||||
}
|
||||
CipherSuite type = convert(proto.getType());
|
||||
CipherSuite suite = convert(proto.getSuite());
|
||||
byte[] key = proto.getKey().toByteArray();
|
||||
byte[] iv = proto.getIv().toByteArray();
|
||||
return new FileEncryptionInfo(type, key, iv);
|
||||
return new FileEncryptionInfo(suite, key, iv);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -850,7 +850,7 @@ public class BlockManager {
|
|||
return null;
|
||||
} else if (blocks.length == 0) {
|
||||
return new LocatedBlocks(0, isFileUnderConstruction,
|
||||
Collections.<LocatedBlock>emptyList(), null, false, null);
|
||||
Collections.<LocatedBlock>emptyList(), null, false, feInfo);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
|
||||
|
|
|
@ -122,7 +122,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoCodec;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
|
@ -154,6 +153,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.UnknownCipherSuiteException;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -2296,7 +2296,50 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* If the file is within an encryption zone, select the appropriate
|
||||
* CipherSuite from the list provided by the client. Since the client may
|
||||
* be newer, need to handle unknown CipherSuites.
|
||||
*
|
||||
* @param src path of the file
|
||||
* @param cipherSuites client-provided list of supported CipherSuites,
|
||||
* in desired order.
|
||||
* @return chosen CipherSuite, or null if file is not in an EncryptionZone
|
||||
* @throws IOException
|
||||
*/
|
||||
private CipherSuite chooseCipherSuite(String src, List<CipherSuite>
|
||||
cipherSuites) throws UnknownCipherSuiteException {
|
||||
EncryptionZone zone = getEncryptionZoneForPath(src);
|
||||
// Not in an EZ
|
||||
if (zone == null) {
|
||||
return null;
|
||||
}
|
||||
CipherSuite chosen = null;
|
||||
for (CipherSuite c : cipherSuites) {
|
||||
if (c.equals(CipherSuite.UNKNOWN)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Ignoring unknown CipherSuite provided by client: "
|
||||
+ c.getUnknownValue());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
for (CipherSuite supported : CipherSuite.values()) {
|
||||
if (supported.equals(c)) {
|
||||
chosen = c;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (chosen == null) {
|
||||
throw new UnknownCipherSuiteException(
|
||||
"No cipher suites provided by the client are supported."
|
||||
+ " Client provided: " + Arrays.toString(cipherSuites.toArray())
|
||||
+ " NameNode supports: " + Arrays.toString(CipherSuite.values()));
|
||||
}
|
||||
return chosen;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new file entry in the namespace.
|
||||
*
|
||||
|
@ -2306,7 +2349,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
HdfsFileStatus startFile(String src, PermissionStatus permissions,
|
||||
String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
boolean createParent, short replication, long blockSize,
|
||||
List<CipherSuite> cipherSuites)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
|
@ -2319,7 +2363,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
try {
|
||||
status = startFileInt(src, permissions, holder, clientMachine, flag,
|
||||
createParent, replication, blockSize, cacheEntry != null);
|
||||
createParent, replication, blockSize, cipherSuites,
|
||||
cacheEntry != null);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "create", src);
|
||||
throw e;
|
||||
|
@ -2332,16 +2377,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
|
||||
String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize,
|
||||
boolean logRetryCache) throws AccessControlException, SafeModeException,
|
||||
List<CipherSuite> cipherSuites, boolean logRetryCache)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
|
||||
+ ", holder=" + holder
|
||||
+ ", clientMachine=" + clientMachine
|
||||
+ ", createParent=" + createParent
|
||||
+ ", replication=" + replication
|
||||
+ ", createFlag=" + flag.toString());
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("DIR* NameSystem.startFile: src=" + src
|
||||
+ ", holder=" + holder
|
||||
+ ", clientMachine=" + clientMachine
|
||||
+ ", createParent=" + createParent
|
||||
+ ", replication=" + replication
|
||||
+ ", createFlag=" + flag.toString()
|
||||
+ ", blockSize=" + blockSize);
|
||||
builder.append(", cipherSuites=");
|
||||
if (cipherSuites != null) {
|
||||
builder.append(Arrays.toString(cipherSuites.toArray()));
|
||||
} else {
|
||||
builder.append("null");
|
||||
}
|
||||
NameNode.stateChangeLog.debug(builder.toString());
|
||||
}
|
||||
if (!DFSUtil.isValidName(src)) {
|
||||
throw new InvalidPathException(src);
|
||||
|
@ -2368,7 +2423,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
checkNameNodeSafeMode("Cannot create file" + src);
|
||||
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
||||
startFileInternal(pc, src, permissions, holder, clientMachine, create,
|
||||
overwrite, createParent, replication, blockSize, logRetryCache);
|
||||
overwrite, createParent, replication, blockSize, cipherSuites,
|
||||
logRetryCache);
|
||||
stat = dir.getFileInfo(src, false);
|
||||
} catch (StandbyException se) {
|
||||
skipSync = true;
|
||||
|
@ -2398,7 +2454,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
private void startFileInternal(FSPermissionChecker pc, String src,
|
||||
PermissionStatus permissions, String holder, String clientMachine,
|
||||
boolean create, boolean overwrite, boolean createParent,
|
||||
short replication, long blockSize, boolean logRetryEntry)
|
||||
short replication, long blockSize, List<CipherSuite> cipherSuites,
|
||||
boolean logRetryEntry)
|
||||
throws FileAlreadyExistsException, AccessControlException,
|
||||
UnresolvedLinkException, FileNotFoundException,
|
||||
ParentNotDirectoryException, IOException {
|
||||
|
@ -2410,6 +2467,25 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throw new FileAlreadyExistsException(src +
|
||||
" already exists as a directory");
|
||||
}
|
||||
|
||||
FileEncryptionInfo feInfo = null;
|
||||
CipherSuite suite = chooseCipherSuite(src, cipherSuites);
|
||||
if (suite != null) {
|
||||
Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
|
||||
"Chose an UNKNOWN CipherSuite!");
|
||||
// TODO: fill in actual key/iv in HDFS-6474
|
||||
// For now, populate with dummy data
|
||||
byte[] key = new byte[suite.getAlgorithmBlockSize()];
|
||||
for (int i = 0; i < key.length; i++) {
|
||||
key[i] = (byte)i;
|
||||
}
|
||||
byte[] iv = new byte[suite.getAlgorithmBlockSize()];
|
||||
for (int i = 0; i < iv.length; i++) {
|
||||
iv[i] = (byte)(3+i*2);
|
||||
}
|
||||
feInfo = new FileEncryptionInfo(suite, key, iv);
|
||||
}
|
||||
|
||||
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
||||
if (isPermissionEnabled) {
|
||||
if (overwrite && myFile != null) {
|
||||
|
@ -2465,6 +2541,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
|
||||
.getClientName(), src);
|
||||
|
||||
// Set encryption attributes if necessary
|
||||
if (feInfo != null) {
|
||||
dir.setFileEncryptionInfo(src, feInfo);
|
||||
newNode = dir.getInode(newNode.getId()).asFile();
|
||||
}
|
||||
|
||||
// record file record in log, record new generation stamp
|
||||
getEditLog().logOpenFile(src, newNode, logRetryEntry);
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
|
@ -8301,7 +8383,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
final String keyId = UUID.randomUUID().toString();
|
||||
// TODO pass in hdfs://HOST:PORT (HDFS-6490)
|
||||
providerOptions.setDescription(src);
|
||||
providerOptions.setBitLength(codec.getAlgorithmBlockSize()*8);
|
||||
providerOptions.setBitLength(codec.getCipherSuite()
|
||||
.getAlgorithmBlockSize()*8);
|
||||
try {
|
||||
provider.createKey(keyId, providerOptions);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
|
@ -8396,6 +8479,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
/** Lookup the encryption zone of a path. */
|
||||
EncryptionZone getEncryptionZoneForPath(String src) {
|
||||
assert hasReadLock();
|
||||
final String[] components = INode.getPathNames(src);
|
||||
for (int i = components.length; i > 0; i--) {
|
||||
final List<String> l = Arrays.asList(Arrays.copyOfRange(components, 0, i));
|
||||
|
|
|
@ -37,6 +37,7 @@ import java.util.Set;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
|
@ -534,7 +535,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
@Override // ClientProtocol
|
||||
public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
boolean createParent, short replication, long blockSize,
|
||||
List<CipherSuite> cipherSuites)
|
||||
throws IOException {
|
||||
String clientMachine = getClientMachine();
|
||||
if (stateChangeLog.isDebugEnabled()) {
|
||||
|
@ -548,7 +550,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
|
||||
getRemoteUser().getShortUserName(), null, masked),
|
||||
clientName, clientMachine, flag.get(), createParent, replication,
|
||||
blockSize);
|
||||
blockSize, cipherSuites);
|
||||
metrics.incrFilesCreated();
|
||||
metrics.incrCreateFileOps();
|
||||
return fileStatus;
|
||||
|
|
|
@ -74,6 +74,7 @@ message CreateRequestProto {
|
|||
required bool createParent = 5;
|
||||
required uint32 replication = 6; // Short: Only 16 bits used
|
||||
required uint64 blockSize = 7;
|
||||
repeated CipherSuite cipherSuites = 8;
|
||||
}
|
||||
|
||||
message CreateResponseProto {
|
||||
|
|
|
@ -169,14 +169,19 @@ message DataEncryptionKeyProto {
|
|||
optional string encryptionAlgorithm = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cipher suite.
|
||||
*/
|
||||
enum CipherSuite {
|
||||
UNKNOWN = 1;
|
||||
AES_CTR_NOPADDING = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encryption information for a file.
|
||||
*/
|
||||
message FileEncryptionInfoProto {
|
||||
enum CipherType {
|
||||
AES_CTR_NOPADDING = 1;
|
||||
}
|
||||
required CipherType type = 1;
|
||||
required CipherSuite suite = 1;
|
||||
required bytes key = 2;
|
||||
required bytes iv = 3;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyList;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyShort;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -262,7 +264,7 @@ public class TestDFSClientRetries {
|
|||
.when(mockNN)
|
||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||
anyShort(), anyLong());
|
||||
anyShort(), anyLong(), (List<CipherSuite>) anyList());
|
||||
|
||||
final DFSClient client = new DFSClient(null, mockNN, conf, null);
|
||||
OutputStream os = client.create("testfile", true);
|
||||
|
|
|
@ -26,15 +26,19 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -44,6 +48,7 @@ import org.junit.Test;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestEncryptionZonesAPI {
|
||||
|
@ -56,7 +61,7 @@ public class TestEncryptionZonesAPI {
|
|||
private final Configuration conf = new Configuration();
|
||||
private MiniDFSCluster cluster;
|
||||
private static File tmpDir;
|
||||
private FileSystem fs;
|
||||
private DistributedFileSystem fs;
|
||||
|
||||
@Before
|
||||
public void setUpCluster() throws IOException {
|
||||
|
@ -65,7 +70,7 @@ public class TestEncryptionZonesAPI {
|
|||
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
fs = createFileSystem(conf);
|
||||
fs = (DistributedFileSystem) createFileSystem(conf);
|
||||
}
|
||||
|
||||
protected FileSystem createFileSystem(Configuration conf) throws IOException {
|
||||
|
@ -424,4 +429,66 @@ public class TestEncryptionZonesAPI {
|
|||
"/test/foo/baz can't be moved from an encryption zone.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testCipherSuiteNegotiation() throws Exception {
|
||||
final HdfsAdmin dfsAdmin =
|
||||
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
|
||||
final Path zone = new Path("/zone");
|
||||
fs.mkdirs(zone);
|
||||
dfsAdmin.createEncryptionZone(zone, null);
|
||||
// Create a file in an EZ, which should succeed
|
||||
DFSTestUtil.createFile(fs, new Path(zone, "success1"), 0, (short) 1,
|
||||
0xFEED);
|
||||
// Pass no cipherSuites, fail
|
||||
fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(0);
|
||||
try {
|
||||
DFSTestUtil.createFile(fs, new Path(zone, "fail"), 0, (short) 1,
|
||||
0xFEED);
|
||||
fail("Created a file without specifying a CipherSuite!");
|
||||
} catch (UnknownCipherSuiteException e) {
|
||||
GenericTestUtils.assertExceptionContains("No cipher suites", e);
|
||||
}
|
||||
// Pass some unknown cipherSuites, fail
|
||||
fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(3);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
|
||||
try {
|
||||
DFSTestUtil.createFile(fs, new Path(zone, "fail"), 0, (short) 1,
|
||||
0xFEED);
|
||||
fail("Created a file without specifying a CipherSuite!");
|
||||
} catch (UnknownCipherSuiteException e) {
|
||||
GenericTestUtils.assertExceptionContains("No cipher suites", e);
|
||||
}
|
||||
// Pass some unknown and a good cipherSuites, success
|
||||
fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(3);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.AES_CTR_NOPADDING);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
|
||||
DFSTestUtil.createFile(fs, new Path(zone, "success2"), 0, (short) 1,
|
||||
0xFEED);
|
||||
fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(3);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
|
||||
fs.getClient().cipherSuites.add(CipherSuite.AES_CTR_NOPADDING);
|
||||
DFSTestUtil.createFile(fs, new Path(zone, "success3"), 4096, (short) 1,
|
||||
0xFEED);
|
||||
// Check that the specified CipherSuite was correctly saved on the NN
|
||||
for (int i=2; i<=3; i++) {
|
||||
LocatedBlocks blocks =
|
||||
fs.getClient().getLocatedBlocks(zone.toString() + "/success2", 0);
|
||||
FileEncryptionInfo feInfo = blocks.getFileEncryptionInfo();
|
||||
assertEquals(feInfo.getCipherSuite(), CipherSuite.AES_CTR_NOPADDING);
|
||||
// TODO: validate against actual key/iv in HDFS-6474
|
||||
byte[] key = feInfo.getEncryptedDataEncryptionKey();
|
||||
for (int j = 0; j < key.length; j++) {
|
||||
assertEquals("Unexpected key byte", (byte)j, key[j]);
|
||||
}
|
||||
byte[] iv = feInfo.getIV();
|
||||
for (int j = 0; j < iv.length; j++) {
|
||||
assertEquals("Unexpected IV byte", (byte)(3+j*2), iv[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1131,7 +1131,7 @@ public class TestFileCreation {
|
|||
try {
|
||||
nnrpc.create(pathStr, new FsPermission((short)0755), "client",
|
||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
||||
true, (short)1, 128*1024*1024L);
|
||||
true, (short)1, 128*1024*1024L, null);
|
||||
fail("Should have thrown exception when creating '"
|
||||
+ pathStr + "'" + " by " + method);
|
||||
} catch (InvalidPathException ipe) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.mockito.Matchers.anyList;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.anyShort;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
|
@ -29,10 +30,12 @@ import static org.mockito.Mockito.spy;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -348,7 +351,7 @@ public class TestLease {
|
|||
.when(mcp)
|
||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||
anyShort(), anyLong());
|
||||
anyShort(), anyLong(), (List<CipherSuite>) anyList());
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
final DFSClient c1 = createDFSClientAs(ugi[0], conf);
|
||||
|
|
|
@ -587,7 +587,8 @@ public class NNThroughputBenchmark implements Tool {
|
|||
// dummyActionNoSynch(fileIdx);
|
||||
nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
|
||||
clientName, new EnumSetWritable<CreateFlag>(EnumSet
|
||||
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
|
||||
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
|
||||
replication, BLOCK_SIZE, null);
|
||||
long end = Time.now();
|
||||
for(boolean written = !closeUponCreate; !written;
|
||||
written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
|
||||
|
@ -1133,7 +1134,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
String fileName = nameGenerator.getNextFileName("ThroughputBench");
|
||||
nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
|
||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
|
||||
BLOCK_SIZE);
|
||||
BLOCK_SIZE, null);
|
||||
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
|
||||
nameNodeProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ public class TestAddBlockRetry {
|
|||
nn.create(src, FsPermission.getFileDefault(),
|
||||
"clientName",
|
||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
||||
true, (short)3, 1024);
|
||||
true, (short)3, 1024, null);
|
||||
|
||||
// start first addBlock()
|
||||
LOG.info("Starting first addBlock for " + src);
|
||||
|
@ -155,7 +155,7 @@ public class TestAddBlockRetry {
|
|||
// create file
|
||||
nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName",
|
||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
|
||||
(short) 3, 1024);
|
||||
(short) 3, 1024, null);
|
||||
// start first addBlock()
|
||||
LOG.info("Starting first addBlock for " + src);
|
||||
LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null,
|
||||
|
|
|
@ -209,19 +209,20 @@ public class TestNamenodeRetryCache {
|
|||
// Two retried calls succeed
|
||||
newCall();
|
||||
HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
|
||||
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize);
|
||||
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1,
|
||||
BlockSize, null);
|
||||
Assert.assertEquals(status, namesystem.startFile(src, perm,
|
||||
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
|
||||
true, (short) 1, BlockSize));
|
||||
true, (short) 1, BlockSize, null));
|
||||
Assert.assertEquals(status, namesystem.startFile(src, perm,
|
||||
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
|
||||
true, (short) 1, BlockSize));
|
||||
true, (short) 1, BlockSize, null));
|
||||
|
||||
// A non-retried call fails
|
||||
newCall();
|
||||
try {
|
||||
namesystem.startFile(src, perm, "holder", "clientmachine",
|
||||
EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize);
|
||||
EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize, null);
|
||||
Assert.fail("testCreate - expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
|
|
|
@ -394,7 +394,7 @@ public class TestRetryCacheWithHA {
|
|||
this.status = client.getNamenode().create(fileName,
|
||||
FsPermission.getFileDefault(), client.getClientName(),
|
||||
new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes,
|
||||
BlockSize);
|
||||
BlockSize, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue