HDFS-7839. Erasure coding: implement facilities in NameNode to create and manage EC zones. Contributed by Zhe Zhang
This commit is contained in:
parent
578019d6a2
commit
1af8c14862
|
@ -2994,6 +2994,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
return new EncryptionZoneIterator(namenode, traceSampler);
|
||||
}
|
||||
|
||||
public void createErasureCodingZone(String src)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
TraceScope scope = getPathTraceScope("createErasureCodingZone", src);
|
||||
try {
|
||||
namenode.createErasureCodingZone(src);
|
||||
} catch (RemoteException re) {
|
||||
throw re.unwrapRemoteException(AccessControlException.class,
|
||||
SafeModeException.class,
|
||||
UnresolvedPathException.class);
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void setXAttr(String src, String name, byte[] value,
|
||||
EnumSet<XAttrSetFlag> flag) throws IOException {
|
||||
checkOpen();
|
||||
|
|
|
@ -1362,6 +1362,14 @@ public interface ClientProtocol {
|
|||
public BatchedEntries<EncryptionZone> listEncryptionZones(
|
||||
long prevId) throws IOException;
|
||||
|
||||
/**
|
||||
* Create an erasure coding zone (currently with hardcoded schema)
|
||||
* TODO: Configurable and pluggable schemas (HDFS-7337)
|
||||
*/
|
||||
@Idempotent
|
||||
public void createErasureCodingZone(String src)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Set xattr of a file or directory.
|
||||
* The name must be prefixed with the namespace followed by ".". For example,
|
||||
|
|
|
@ -192,6 +192,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathResponseProto;
|
||||
|
@ -1390,6 +1392,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CreateErasureCodingZoneResponseProto createErasureCodingZone(
|
||||
RpcController controller, CreateErasureCodingZoneRequestProto req)
|
||||
throws ServiceException {
|
||||
try {
|
||||
server.createErasureCodingZone(req.getSrc());
|
||||
return CreateErasureCodingZoneResponseProto.newBuilder().build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetXAttrResponseProto setXAttr(RpcController controller,
|
||||
SetXAttrRequestProto req) throws ServiceException {
|
||||
|
|
|
@ -160,6 +160,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Trunca
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
||||
|
@ -1406,6 +1408,20 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createErasureCodingZone(String src)
|
||||
throws IOException {
|
||||
final CreateErasureCodingZoneRequestProto.Builder builder =
|
||||
CreateErasureCodingZoneRequestProto.newBuilder();
|
||||
builder.setSrc(src);
|
||||
CreateErasureCodingZoneRequestProto req = builder.build();
|
||||
try {
|
||||
rpcProxy.createErasureCodingZone(null, req);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
|
||||
throws IOException {
|
||||
|
|
|
@ -396,4 +396,6 @@ public interface HdfsServerConstants {
|
|||
"raw.hdfs.crypto.file.encryption.info";
|
||||
String SECURITY_XATTR_UNREADABLE_BY_SUPERUSER =
|
||||
"security.hdfs.unreadable.by.superuser";
|
||||
public static final String XATTR_ERASURECODING_ZONE =
|
||||
"raw.hdfs.erasurecoding.zone";
|
||||
}
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_ZONE;
|
||||
|
||||
/**
|
||||
* Manages the list of erasure coding zones in the filesystem.
|
||||
* <p/>
|
||||
* The ErasureCodingZoneManager has its own lock, but relies on the FSDirectory
|
||||
* lock being held for many operations. The FSDirectory lock should not be
|
||||
* taken if the manager lock is already held.
|
||||
* TODO: consolidate zone logic w/ encrypt. zones {@link EncryptionZoneManager}
|
||||
*/
|
||||
public class ErasureCodingZoneManager {
|
||||
private final FSDirectory dir;
|
||||
|
||||
/**
|
||||
* Construct a new ErasureCodingZoneManager.
|
||||
*
|
||||
* @param dir Enclosing FSDirectory
|
||||
*/
|
||||
public ErasureCodingZoneManager(FSDirectory dir) {
|
||||
this.dir = dir;
|
||||
}
|
||||
|
||||
boolean getECPolicy(INodesInPath iip) {
|
||||
assert dir.hasReadLock();
|
||||
Preconditions.checkNotNull(iip);
|
||||
List<INode> inodes = iip.getReadOnlyINodes();
|
||||
for (int i = inodes.size() - 1; i >= 0; i--) {
|
||||
final INode inode = inodes.get(i);
|
||||
if (inode == null) {
|
||||
continue;
|
||||
}
|
||||
final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
|
||||
new ArrayList<XAttr>(0)
|
||||
: inode.getXAttrFeature().getXAttrs();
|
||||
for (XAttr xAttr : xAttrs) {
|
||||
if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
XAttr createErasureCodingZone(String src)
|
||||
throws IOException {
|
||||
assert dir.hasWriteLock();
|
||||
final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
|
||||
if (dir.isNonEmptyDirectory(srcIIP)) {
|
||||
throw new IOException(
|
||||
"Attempt to create an erasure coding zone for a " +
|
||||
"non-empty directory.");
|
||||
}
|
||||
if (srcIIP != null &&
|
||||
srcIIP.getLastINode() != null &&
|
||||
!srcIIP.getLastINode().isDirectory()) {
|
||||
throw new IOException("Attempt to create an erasure coding zone " +
|
||||
"for a file.");
|
||||
}
|
||||
if (getECPolicy(srcIIP)) {
|
||||
throw new IOException("Directory " + src + " is already in an " +
|
||||
"erasure coding zone.");
|
||||
}
|
||||
final XAttr ecXAttr = XAttrHelper
|
||||
.buildXAttr(XATTR_ERASURECODING_ZONE, null);
|
||||
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
|
||||
xattrs.add(ecXAttr);
|
||||
FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
|
||||
EnumSet.of(XAttrSetFlag.CREATE));
|
||||
return ecXAttr;
|
||||
}
|
||||
|
||||
void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
|
||||
throws IOException {
|
||||
assert dir.hasReadLock();
|
||||
if (getECPolicy(srcIIP)
|
||||
!= getECPolicy(dstIIP)) {
|
||||
throw new IOException(
|
||||
src + " can't be moved because the source and destination have " +
|
||||
"different erasure coding policies.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -185,6 +185,7 @@ class FSDirRenameOp {
|
|||
}
|
||||
|
||||
fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
|
||||
fsd.ecZoneManager.checkMoveValidity(srcIIP, dstIIP, src);
|
||||
// Ensure dst has quota to accommodate rename
|
||||
verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
|
||||
verifyQuotaForRename(fsd, srcIIP, dstIIP);
|
||||
|
@ -357,6 +358,7 @@ class FSDirRenameOp {
|
|||
|
||||
BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
|
||||
fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
|
||||
fsd.ecZoneManager.checkMoveValidity(srcIIP, dstIIP, src);
|
||||
final INode dstInode = dstIIP.getLastINode();
|
||||
List<INodeDirectory> snapshottableDirs = new ArrayList<>();
|
||||
if (dstInode != null) { // Destination exists
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
|
@ -482,9 +483,6 @@ class FSDirWriteFileOp {
|
|||
try {
|
||||
INodesInPath iip = fsd.addINode(existing, newNode);
|
||||
if (iip != null) {
|
||||
if (newNode.isStriped()) {
|
||||
newNode.addStripedBlocksFeature();
|
||||
}
|
||||
if (aclEntries != null) {
|
||||
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
|
||||
}
|
||||
|
@ -560,9 +558,6 @@ class FSDirWriteFileOp {
|
|||
fsd.writeLock();
|
||||
try {
|
||||
newiip = fsd.addINode(existing, newNode);
|
||||
if (newiip != null && newNode.isStriped()) {
|
||||
newNode.addStripedBlocksFeature();
|
||||
}
|
||||
} finally {
|
||||
fsd.writeUnlock();
|
||||
}
|
||||
|
@ -610,7 +605,7 @@ class FSDirWriteFileOp {
|
|||
}
|
||||
}
|
||||
final INodeFile file = fsn.checkLease(src, clientName, inode, fileId);
|
||||
BlockInfoContiguous lastBlockInFile = file.getLastBlock();
|
||||
BlockInfo lastBlockInFile = file.getLastBlock();
|
||||
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
||||
// The block that the client claims is the current last block
|
||||
// doesn't match up with what we think is the last block. There are
|
||||
|
@ -638,7 +633,7 @@ class FSDirWriteFileOp {
|
|||
// changed the namesystem state yet.
|
||||
// We run this analysis again in Part II where case 4 is impossible.
|
||||
|
||||
BlockInfoContiguous penultimateBlock = file.getPenultimateBlock();
|
||||
BlockInfo penultimateBlock = file.getPenultimateBlock();
|
||||
if (previous == null &&
|
||||
lastBlockInFile != null &&
|
||||
lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
|
||||
|
|
|
@ -205,6 +205,9 @@ public class FSDirectory implements Closeable {
|
|||
@VisibleForTesting
|
||||
public final EncryptionZoneManager ezManager;
|
||||
|
||||
@VisibleForTesting
|
||||
public final ErasureCodingZoneManager ecZoneManager;
|
||||
|
||||
/**
|
||||
* Caches frequently used file names used in {@link INode} to reuse
|
||||
* byte[] objects and reduce heap usage.
|
||||
|
@ -296,6 +299,7 @@ public class FSDirectory implements Closeable {
|
|||
namesystem = ns;
|
||||
this.editLog = ns.getEditLog();
|
||||
ezManager = new EncryptionZoneManager(this, conf);
|
||||
ecZoneManager = new ErasureCodingZoneManager(this);
|
||||
}
|
||||
|
||||
FSNamesystem getFSNamesystem() {
|
||||
|
@ -1221,6 +1225,25 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
XAttr createErasureCodingZone(String src)
|
||||
throws IOException {
|
||||
writeLock();
|
||||
try {
|
||||
return ecZoneManager.createErasureCodingZone(src);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getECPolicy(INodesInPath iip) {
|
||||
readLock();
|
||||
try {
|
||||
return ecZoneManager.getECPolicy(iip);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
static INode resolveLastINode(INodesInPath iip) throws FileNotFoundException {
|
||||
INode inode = iip.getLastINode();
|
||||
if (inode == null) {
|
||||
|
|
|
@ -7510,6 +7510,46 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an erasure coding zone on directory src.
|
||||
*
|
||||
* @param src the path of a directory which will be the root of the
|
||||
* erasure coding zone. The directory must be empty.
|
||||
* @throws AccessControlException if the caller is not the superuser.
|
||||
* @throws UnresolvedLinkException if the path can't be resolved.
|
||||
* @throws SafeModeException if the Namenode is in safe mode.
|
||||
*/
|
||||
void createErasureCodingZone(final String srcArg,
|
||||
final boolean logRetryCache)
|
||||
throws IOException, UnresolvedLinkException,
|
||||
SafeModeException, AccessControlException {
|
||||
String src = srcArg;
|
||||
HdfsFileStatus resultingStat = null;
|
||||
checkSuperuserPrivilege();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final byte[][] pathComponents =
|
||||
FSDirectory.getPathComponentsForReservedPath(src);
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
writeLock();
|
||||
try {
|
||||
checkSuperuserPrivilege();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot create erasure coding zone on " + src);
|
||||
src = dir.resolvePath(pc, src, pathComponents);
|
||||
|
||||
final XAttr ecXAttr = dir.createErasureCodingZone(src);
|
||||
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
|
||||
xAttrs.add(ecXAttr);
|
||||
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
|
||||
final INodesInPath iip = dir.getINodesInPath4Write(src, false);
|
||||
resultingStat = dir.getAuditFileInfo(iip);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
getEditLog().logSync();
|
||||
logAuditEvent(true, "createErasureCodingZone", srcArg, null, resultingStat);
|
||||
}
|
||||
|
||||
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
|
||||
boolean logRetryCache)
|
||||
throws IOException {
|
||||
|
|
|
@ -1114,8 +1114,7 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
*/
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
// TODO: move erasure coding policy to file XAttr
|
||||
public boolean isStriped() {
|
||||
return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID;
|
||||
return getStripedBlocksFeature() != null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1820,6 +1820,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
return namesystem.listEncryptionZones(prevId);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void createErasureCodingZone(String src)
|
||||
throws IOException {
|
||||
checkNNStartup();
|
||||
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.createErasureCodingZone(src, cacheEntry != null);
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
|
||||
throws IOException {
|
||||
|
|
|
@ -714,6 +714,13 @@ message GetEditsFromTxidResponseProto {
|
|||
required EventsListProto eventsList = 1;
|
||||
}
|
||||
|
||||
message CreateErasureCodingZoneRequestProto {
|
||||
required string src = 1;
|
||||
}
|
||||
|
||||
message CreateErasureCodingZoneResponseProto {
|
||||
}
|
||||
|
||||
service ClientNamenodeProtocol {
|
||||
rpc getBlockLocations(GetBlockLocationsRequestProto)
|
||||
returns(GetBlockLocationsResponseProto);
|
||||
|
@ -856,6 +863,8 @@ service ClientNamenodeProtocol {
|
|||
returns(ListEncryptionZonesResponseProto);
|
||||
rpc getEZForPath(GetEZForPathRequestProto)
|
||||
returns(GetEZForPathResponseProto);
|
||||
rpc createErasureCodingZone(CreateErasureCodingZoneRequestProto)
|
||||
returns(CreateErasureCodingZoneResponseProto);
|
||||
rpc getCurrentEditLogTxid(GetCurrentEditLogTxidRequestProto)
|
||||
returns(GetCurrentEditLogTxidResponseProto);
|
||||
rpc getEditsFromTxid(GetEditsFromTxidRequestProto)
|
||||
|
|
|
@ -119,9 +119,6 @@ public class TestBlockStoragePolicy {
|
|||
expectedPolicyStrings.put(COLD,
|
||||
"BlockStoragePolicy{COLD:" + COLD + ", storageTypes=[ARCHIVE], " +
|
||||
"creationFallbacks=[], replicationFallbacks=[]}");
|
||||
expectedPolicyStrings.put(EC,
|
||||
"BlockStoragePolicy{EC:" + EC + ", storageTypes=[DISK], " +
|
||||
"creationFallbacks=[], replicationFallbacks=[ARCHIVE]}");
|
||||
expectedPolicyStrings.put(WARM,
|
||||
"BlockStoragePolicy{WARM:" + WARM + ", storageTypes=[DISK, ARCHIVE], " +
|
||||
"creationFallbacks=[DISK, ARCHIVE], " +
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestErasureCodingZones {
|
||||
private final int NUM_OF_DATANODES = 3;
|
||||
private Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
private static final int BLOCK_SIZE = 1024;
|
||||
private FSNamesystem namesystem;
|
||||
|
||||
@Before
|
||||
public void setupCluster() throws IOException {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
cluster = new MiniDFSCluster.Builder(conf).
|
||||
numDataNodes(NUM_OF_DATANODES).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
namesystem = cluster.getNamesystem();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdownCluster() throws IOException {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateECZone()
|
||||
throws IOException, InterruptedException {
|
||||
final Path testDir = new Path("/ec");
|
||||
fs.mkdir(testDir, FsPermission.getDirDefault());
|
||||
|
||||
/* Normal creation of an erasure coding zone */
|
||||
fs.getClient().createErasureCodingZone(testDir.toString());
|
||||
|
||||
/* Verify files under the zone are striped */
|
||||
final Path ECFilePath = new Path(testDir, "foo");
|
||||
fs.create(ECFilePath);
|
||||
INode inode = namesystem.getFSDirectory().getINode(ECFilePath.toString());
|
||||
assertTrue(inode.asFile().isStriped());
|
||||
|
||||
/* Verify that EC zone cannot be created on non-empty dir */
|
||||
final Path notEmpty = new Path("/nonEmpty");
|
||||
fs.mkdir(notEmpty, FsPermission.getDirDefault());
|
||||
fs.create(new Path(notEmpty, "foo"));
|
||||
try {
|
||||
fs.getClient().createErasureCodingZone(notEmpty.toString());
|
||||
fail("Erasure coding zone on non-empty dir");
|
||||
} catch (IOException e) {
|
||||
assertExceptionContains("erasure coding zone for a non-empty directory", e);
|
||||
}
|
||||
|
||||
/* Verify that nested EC zones cannot be created */
|
||||
final Path zone1 = new Path("/zone1");
|
||||
final Path zone2 = new Path(zone1, "zone2");
|
||||
fs.mkdir(zone1, FsPermission.getDirDefault());
|
||||
fs.getClient().createErasureCodingZone(zone1.toString());
|
||||
fs.mkdir(zone2, FsPermission.getDirDefault());
|
||||
try {
|
||||
fs.getClient().createErasureCodingZone(zone2.toString());
|
||||
fail("Nested erasure coding zones");
|
||||
} catch (IOException e) {
|
||||
assertExceptionContains("already in an erasure coding zone", e);
|
||||
}
|
||||
|
||||
/* Verify that EC zone cannot be created on a file */
|
||||
final Path fPath = new Path("/file");
|
||||
fs.create(fPath);
|
||||
try {
|
||||
fs.getClient().createErasureCodingZone(fPath.toString());
|
||||
fail("Erasure coding zone on file");
|
||||
} catch (IOException e) {
|
||||
assertExceptionContains("erasure coding zone for a file", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveValidity() throws IOException, InterruptedException {
|
||||
final Path srcECDir = new Path("/srcEC");
|
||||
final Path dstECDir = new Path("/dstEC");
|
||||
fs.mkdir(srcECDir, FsPermission.getDirDefault());
|
||||
fs.mkdir(dstECDir, FsPermission.getDirDefault());
|
||||
fs.getClient().createErasureCodingZone(srcECDir.toString());
|
||||
fs.getClient().createErasureCodingZone(dstECDir.toString());
|
||||
final Path srcFile = new Path(srcECDir, "foo");
|
||||
fs.create(srcFile);
|
||||
|
||||
/* Verify that a file can be moved between 2 EC zones */
|
||||
try {
|
||||
fs.rename(srcFile, dstECDir);
|
||||
} catch (IOException e) {
|
||||
fail("A file should be able to move between 2 EC zones " + e);
|
||||
}
|
||||
|
||||
// Move the file back
|
||||
fs.rename(new Path(dstECDir, "foo"), srcECDir);
|
||||
|
||||
/* Verify that a file cannot be moved from a non-EC dir to an EC zone */
|
||||
final Path nonECDir = new Path("/nonEC");
|
||||
fs.mkdir(nonECDir, FsPermission.getDirDefault());
|
||||
try {
|
||||
fs.rename(srcFile, nonECDir);
|
||||
fail("A file shouldn't be able to move from a non-EC dir to an EC zone");
|
||||
} catch (IOException e) {
|
||||
assertExceptionContains("can't be moved because the source and " +
|
||||
"destination have different erasure coding policies", e);
|
||||
}
|
||||
|
||||
/* Verify that a file cannot be moved from an EC zone to a non-EC dir */
|
||||
final Path nonECFile = new Path(nonECDir, "nonECFile");
|
||||
fs.create(nonECFile);
|
||||
try {
|
||||
fs.rename(nonECFile, dstECDir);
|
||||
} catch (IOException e) {
|
||||
assertExceptionContains("can't be moved because the source and " +
|
||||
"destination have different erasure coding policies", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_ID;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestBlockInitialEncoding {
|
||||
private final int NUM_OF_DATANODES = 3;
|
||||
private Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
private static final int BLOCK_SIZE = 1024;
|
||||
private HdfsAdmin dfsAdmin;
|
||||
private FSNamesystem namesystem;
|
||||
|
||||
@Before
|
||||
public void setupCluster() throws IOException {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
cluster = new MiniDFSCluster.Builder(conf).
|
||||
numDataNodes(NUM_OF_DATANODES).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
|
||||
namesystem = cluster.getNamesystem();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdownCluster() throws IOException {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockInitialEncoding()
|
||||
throws IOException, InterruptedException {
|
||||
final Path testDir = new Path("/test");
|
||||
fs.mkdir(testDir, FsPermission.getDirDefault());
|
||||
dfsAdmin.setStoragePolicy(testDir, EC_STORAGE_POLICY_NAME);
|
||||
final Path ECFilePath = new Path("/test/foo.ec");
|
||||
DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, (short) 3, 0);
|
||||
INode inode = namesystem.getFSDirectory().getINode(ECFilePath.toString());
|
||||
assertEquals(EC_STORAGE_POLICY_ID, inode.getStoragePolicyID());
|
||||
}
|
||||
|
||||
}
|
|
@ -68,7 +68,7 @@ public class TestAddStripedBlocks {
|
|||
.numDataNodes(GROUP_SIZE).build();
|
||||
cluster.waitActive();
|
||||
dfs = cluster.getFileSystem();
|
||||
dfs.setStoragePolicy(new Path("/"), HdfsConstants.EC_STORAGE_POLICY_NAME);
|
||||
dfs.getClient().createErasureCodingZone("/");
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -445,8 +445,7 @@ public class TestFSEditLogLoader {
|
|||
|
||||
//set the storage policy of the directory
|
||||
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
||||
fs.setStoragePolicy(new Path(testDir),
|
||||
HdfsConstants.EC_STORAGE_POLICY_NAME);
|
||||
fs.getClient().getNamenode().createErasureCodingZone(testDir);
|
||||
|
||||
// Create a file with striped block
|
||||
Path p = new Path(testFilePath);
|
||||
|
@ -518,8 +517,7 @@ public class TestFSEditLogLoader {
|
|||
|
||||
//set the storage policy of the directory
|
||||
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
||||
fs.setStoragePolicy(new Path(testDir),
|
||||
HdfsConstants.EC_STORAGE_POLICY_NAME);
|
||||
fs.getClient().getNamenode().createErasureCodingZone(testDir);
|
||||
|
||||
//create a file with striped blocks
|
||||
Path p = new Path(testFilePath);
|
||||
|
|
|
@ -33,18 +33,14 @@ import java.io.IOException;
|
|||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -59,7 +55,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||
|
@ -137,9 +132,10 @@ public class TestFSImage {
|
|||
}
|
||||
}
|
||||
|
||||
private void testSaveAndLoadINodeFile(FSNamesystem fsn, Configuration conf,
|
||||
private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
|
||||
boolean isUC) throws IOException{
|
||||
// contruct a INode with StripedBlock for saving and loading
|
||||
fsn.createErasureCodingZone("/", false);
|
||||
long id = 123456789;
|
||||
byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
|
||||
PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
|
||||
|
@ -149,9 +145,8 @@ public class TestFSImage {
|
|||
BlockInfoContiguous[] blks = new BlockInfoContiguous[0];
|
||||
short replication = 3;
|
||||
long preferredBlockSize = 128*1024*1024;
|
||||
byte storagePolicyID = HdfsConstants.EC_STORAGE_POLICY_ID;
|
||||
INodeFile file = new INodeFile(id, name, permissionStatus, mtime, atime,
|
||||
blks, replication, preferredBlockSize, storagePolicyID);
|
||||
blks, replication, preferredBlockSize);
|
||||
ByteArrayOutputStream bs = new ByteArrayOutputStream();
|
||||
file.addStripedBlocksFeature();
|
||||
|
||||
|
@ -237,13 +232,13 @@ public class TestFSImage {
|
|||
* FSImageSerialization and loaded by FSImageFormat#Loader.
|
||||
*/
|
||||
@Test
|
||||
public void testSaveAndLoadInodeFile() throws IOException{
|
||||
public void testSaveAndLoadStripedINodeFile() throws IOException{
|
||||
Configuration conf = new Configuration();
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
testSaveAndLoadINodeFile(cluster.getNamesystem(), conf, false);
|
||||
testSaveAndLoadStripedINodeFile(cluster.getNamesystem(), conf, false);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
@ -256,14 +251,14 @@ public class TestFSImage {
|
|||
* saved and loaded by FSImageSerialization
|
||||
*/
|
||||
@Test
|
||||
public void testSaveAndLoadInodeFileUC() throws IOException{
|
||||
public void testSaveAndLoadStripedINodeFileUC() throws IOException{
|
||||
// construct a INode with StripedBlock for saving and loading
|
||||
Configuration conf = new Configuration();
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
testSaveAndLoadINodeFile(cluster.getNamesystem(), conf, true);
|
||||
testSaveAndLoadStripedINodeFile(cluster.getNamesystem(), conf, true);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
@ -402,7 +397,7 @@ public class TestFSImage {
|
|||
.build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
fs.setStoragePolicy(new Path("/"), HdfsConstants.EC_STORAGE_POLICY_NAME);
|
||||
fs.getClient().getNamenode().createErasureCodingZone("/");
|
||||
Path file = new Path("/striped");
|
||||
FSDataOutputStream out = fs.create(file);
|
||||
byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
|
@ -37,23 +36,19 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -93,7 +88,7 @@ public class TestRecoverStripedBlocks {
|
|||
int numBlocks) throws Exception {
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
dfs.mkdirs(dir);
|
||||
dfs.setStoragePolicy(dir, EC_STORAGE_POLICY_NAME);
|
||||
dfs.getClient().getNamenode().createErasureCodingZone(dir.toString());
|
||||
|
||||
FSDataOutputStream out = null;
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue