HDFS-12258. ec -listPolicies should list all policies in system, no matter it's enabled or disabled. Contributed by Wei Zhou.

This commit is contained in:
Rakesh Radhakrishnan 2017-08-30 12:58:56 +05:30
parent 32cba6c303
commit 200b11368d
15 changed files with 388 additions and 213 deletions

View File

@ -17,6 +17,35 @@
*/
package org.apache.hadoop.hdfs;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.apache.htrace.core.TraceScope;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
@ -32,45 +61,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
import org.apache.htrace.core.TraceScope;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
@ -777,7 +776,8 @@ public class DFSStripedOutputStream extends DFSOutputStream
// should update the block group length based on the acked length
final long sentBytes = currentBlockGroup.getNumBytes();
final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks;
Preconditions.checkState(ackedBytes <= sentBytes);
Preconditions.checkState(ackedBytes <= sentBytes,
"Acked:" + ackedBytes + ", Sent:" + sentBytes);
currentBlockGroup.setNumBytes(ackedBytes);
newBG.setNumBytes(ackedBytes);
dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,

View File

@ -18,21 +18,14 @@
package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CacheFlag;
@ -53,24 +46,24 @@ import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@ -81,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
@ -95,17 +89,22 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
@ -2603,7 +2602,8 @@ public class DistributedFileSystem extends FileSystem {
/**
* Retrieve all the erasure coding policies supported by this file system,
* excluding REPLICATION policy.
* including enabled, disabled and removed policies, but excluding
* REPLICATION policy.
*
* @return all erasure coding policies supported by this file system.
* @throws IOException
@ -2628,8 +2628,9 @@ public class DistributedFileSystem extends FileSystem {
/**
* Add Erasure coding policies to HDFS. For each policy input, schema and
* cellSize are musts, name and id are ignored. They will be automatically
* created and assigned by Namenode once the policy is successfully added, and
* will be returned in the response.
* created and assigned by Namenode once the policy is successfully added,
* and will be returned in the response; policy states will be set to
* DISABLED automatically.
*
* @param policies The user defined ec policy list to add.
* @return Return the response list of adding operations.

View File

@ -17,12 +17,6 @@
*/
package org.apache.hadoop.hdfs.client;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.EnumSet;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -50,10 +44,16 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.security.AccessControlException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.EnumSet;
/**
* The public API for performing administrative functions on HDFS. Those writing
* applications against HDFS should prefer this interface to directly accessing
@ -554,8 +554,9 @@ public class HdfsAdmin {
/**
* Add Erasure coding policies to HDFS. For each policy input, schema and
* cellSize are musts, name and id are ignored. They will be automatically
* created and assigned by Namenode once the policy is successfully added, and
* will be returned in the response.
* created and assigned by Namenode once the policy is successfully added,
* and will be returned in the response; policy states will be set to
* DISABLED automatically.
*
* @param policies The user defined ec policy list to add.
* @return Return the response list of adding operations.

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.Serializable;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
@ -27,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import java.io.Serializable;
/**
* A policy about how to write/read/code an erasure coding file.
*/
@ -40,9 +40,11 @@ public final class ErasureCodingPolicy implements Serializable {
private final ECSchema schema;
private final int cellSize;
private byte id;
private ErasureCodingPolicyState state;
public ErasureCodingPolicy(String name, ECSchema schema,
int cellSize, byte id) {
int cellSize, byte id, ErasureCodingPolicyState state) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
@ -52,14 +54,22 @@ public final class ErasureCodingPolicy implements Serializable {
this.schema = schema;
this.cellSize = cellSize;
this.id = id;
this.state = state;
}
public ErasureCodingPolicy(String name, ECSchema schema, int cellSize,
byte id) {
this(name, schema, cellSize, id, ErasureCodingPolicyState.DISABLED);
}
public ErasureCodingPolicy(ECSchema schema, int cellSize, byte id) {
this(composePolicyName(schema, cellSize), schema, cellSize, id);
this(composePolicyName(schema, cellSize), schema, cellSize, id,
ErasureCodingPolicyState.DISABLED);
}
public ErasureCodingPolicy(ECSchema schema, int cellSize) {
this(composePolicyName(schema, cellSize), schema, cellSize, (byte) -1);
this(composePolicyName(schema, cellSize), schema, cellSize, (byte) -1,
ErasureCodingPolicyState.DISABLED);
}
public static String composePolicyName(ECSchema schema, int cellSize) {
@ -108,10 +118,35 @@ public final class ErasureCodingPolicy implements Serializable {
this.id = id;
}
public boolean isReplicationPolicy() {
return (id == ErasureCodeConstants.REPLICATION_POLICY_ID);
}
public ErasureCodingPolicyState getState() {
return state;
}
public void setState(ErasureCodingPolicyState state) {
this.state = state;
}
public boolean isSystemPolicy() {
return (this.id < ErasureCodeConstants.USER_DEFINED_POLICY_START_ID);
}
public boolean isEnabled() {
return (this.state == ErasureCodingPolicyState.ENABLED);
}
public boolean isDisabled() {
return (this.state == ErasureCodingPolicyState.DISABLED);
}
public boolean isRemoved() {
return (this.state == ErasureCodingPolicyState.REMOVED);
}
@Override
public boolean equals(Object o) {
if (o == null) {
@ -129,6 +164,7 @@ public final class ErasureCodingPolicy implements Serializable {
.append(schema, rhs.schema)
.append(cellSize, rhs.cellSize)
.append(id, rhs.id)
.append(state, rhs.state)
.isEquals();
}
@ -139,6 +175,7 @@ public final class ErasureCodingPolicy implements Serializable {
.append(schema)
.append(cellSize)
.append(id)
.append(state)
.toHashCode();
}
@ -147,7 +184,8 @@ public final class ErasureCodingPolicy implements Serializable {
return "ErasureCodingPolicy=[" + "Name=" + name + ", "
+ "Schema=[" + schema.toString() + "], "
+ "CellSize=" + cellSize + ", "
+ "Id=" + id
+ "Id=" + id + ", "
+ "State=" + state.toString()
+ "]";
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Value denotes the possible states of an ErasureCodingPolicy.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum ErasureCodingPolicyState {
/** Policy is disabled. It's policy default state. */
DISABLED(1),
/** Policy is enabled. It can be applied to directory and file. */
ENABLED(2),
/**
* Policy is removed from the system. Due to there are potential files
* use this policy, it cannot be deleted from system immediately. A removed
* policy can be re-enabled later.*/
REMOVED(3);
private static final ErasureCodingPolicyState[] CACHED_VALUES =
ErasureCodingPolicyState.values();
private final int value;
ErasureCodingPolicyState(int v) {
value = v;
}
public int getValue() {
return value;
}
public static ErasureCodingPolicyState fromValue(int v) {
if (v > 0 && v <= CACHED_VALUES.length) {
return CACHED_VALUES[v - 1];
}
return null;
}
/** Read from in. */
public static ErasureCodingPolicyState read(DataInput in) throws IOException {
return fromValue(in.readByte());
}
/** Write to out. */
public void write(DataOutput out) throws IOException {
out.writeByte(ordinal());
}
}

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -2925,6 +2926,16 @@ public class PBHelperClient {
return builder.build();
}
public static ErasureCodingPolicyState convertECState(
HdfsProtos.ErasureCodingPolicyState state) {
return ErasureCodingPolicyState.fromValue(state.getNumber());
}
public static HdfsProtos.ErasureCodingPolicyState convertECState(
ErasureCodingPolicyState state) {
return HdfsProtos.ErasureCodingPolicyState.valueOf(state.getValue());
}
public static ErasureCodingPolicy convertErasureCodingPolicy(
ErasureCodingPolicyProto proto) {
final byte id = (byte) (proto.getId() & 0xFF);
@ -2938,10 +2949,12 @@ public class PBHelperClient {
"Missing schema field in ErasureCodingPolicy proto");
Preconditions.checkArgument(proto.hasCellSize(),
"Missing cellsize field in ErasureCodingPolicy proto");
Preconditions.checkArgument(proto.hasState(),
"Missing state field in ErasureCodingPolicy proto");
return new ErasureCodingPolicy(proto.getName(),
convertECSchema(proto.getSchema()),
proto.getCellSize(), id);
proto.getCellSize(), id, convertECState(proto.getState()));
}
return policy;
}
@ -2955,7 +2968,8 @@ public class PBHelperClient {
if (SystemErasureCodingPolicies.getByID(policy.getId()) == null) {
builder.setName(policy.getName())
.setSchema(convertECSchema(policy.getSchema()))
.setCellSize(policy.getCellSize());
.setCellSize(policy.getCellSize())
.setState(convertECState(policy.getState()));
}
return builder.build();
}

View File

@ -373,11 +373,21 @@ message ECSchemaProto {
repeated ECSchemaOptionEntryProto options = 4;
}
/**
* EC policy state.
*/
enum ErasureCodingPolicyState {
DISABLED = 1;
ENABLED = 2;
REMOVED = 3;
}
message ErasureCodingPolicyProto {
optional string name = 1;
optional ECSchemaProto schema = 2;
optional uint32 cellSize = 3;
required uint32 id = 4; // Actually a byte - only 8 bits used
optional ErasureCodingPolicyState state = 5 [default = ENABLED];
}
message AddECPolicyResponseProto {

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -31,11 +33,11 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* This manages erasure coding policies predefined and activated in the system.
@ -60,25 +62,32 @@ public final class ErasureCodingPolicyManager {
HdfsConstants.ALLSSD_STORAGE_POLICY_ID};
/**
* All user defined policies sorted by name for fast querying.
* All policies sorted by name for fast querying, include built-in policy,
* user defined policy, removed policy.
*/
private Map<String, ErasureCodingPolicy> userPoliciesByName;
private Map<String, ErasureCodingPolicy> policiesByName;
/**
* All user defined policies sorted by ID for fast querying.
* All policies sorted by ID for fast querying, including built-in policy,
* user defined policy, removed policy.
*/
private Map<Byte, ErasureCodingPolicy> userPoliciesByID;
private Map<Byte, ErasureCodingPolicy> policiesByID;
/**
* All removed policies sorted by name.
* For better performance when query all Policies.
*/
private Map<String, ErasureCodingPolicy> removedPoliciesByName;
private ErasureCodingPolicy[] allPolicies;
/**
* All enabled policies maintained in NN memory for fast querying,
* identified and sorted by its name.
* All enabled policies sorted by name for fast querying, including built-in
* policy, user defined policy.
*/
private Map<String, ErasureCodingPolicy> enabledPoliciesByName;
/**
* For better performance when query all enabled Policies.
*/
private ErasureCodingPolicy[] enabledPolicies;
private volatile static ErasureCodingPolicyManager instance = null;
@ -101,46 +110,51 @@ public final class ErasureCodingPolicyManager {
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
final String[] policyNames =
(String[]) ArrayUtils.add(enablePolicyNames, defaultPolicyName);
this.userPoliciesByID = new TreeMap<>();
this.userPoliciesByName = new TreeMap<>();
this.removedPoliciesByName = new TreeMap<>();
this.policiesByName = new TreeMap<>();
this.policiesByID = new TreeMap<>();
this.enabledPoliciesByName = new TreeMap<>();
/**
* TODO: load user defined EC policy from fsImage HDFS-7859
* load persistent policies from image and editlog, which is done only once
* during NameNode startup. This can be done here or in a separate method.
*/
/*
* Add all System built-in policies into policy map
*/
for (ErasureCodingPolicy policy :
SystemErasureCodingPolicies.getPolicies()) {
policiesByName.put(policy.getName(), policy);
policiesByID.put(policy.getId(), policy);
}
for (String policyName : policyNames) {
if (policyName.trim().isEmpty()) {
continue;
}
ErasureCodingPolicy ecPolicy =
SystemErasureCodingPolicies.getByName(policyName);
ErasureCodingPolicy ecPolicy = policiesByName.get(policyName);
if (ecPolicy == null) {
ecPolicy = userPoliciesByName.get(policyName);
if (ecPolicy == null) {
String allPolicies = SystemErasureCodingPolicies.getPolicies()
String names = policiesByName.values()
.stream().map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(", ")) + ", " +
userPoliciesByName.values().stream()
.map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(", "));
String msg = String.format("EC policy '%s' specified at %s is not a "
+ "valid policy. Please choose from list of available "
+ "policies: [%s]",
policyName,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
allPolicies);
names);
throw new IllegalArgumentException(msg);
}
}
enabledPoliciesByName.put(ecPolicy.getName(), ecPolicy);
}
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
allPolicies = policiesByName.values().toArray(new ErasureCodingPolicy[0]);
maxCellSize = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT);
/**
* TODO: HDFS-7859 persist into NameNode
* load persistent policies from image and editlog, which is done only once
* during NameNode startup. This can be done here or in a separate method.
*/
}
/**
@ -148,9 +162,7 @@ public final class ErasureCodingPolicyManager {
* @return all policies
*/
public ErasureCodingPolicy[] getEnabledPolicies() {
ErasureCodingPolicy[] results =
new ErasureCodingPolicy[enabledPoliciesByName.size()];
return enabledPoliciesByName.values().toArray(results);
return enabledPolicies;
}
/**
@ -187,9 +199,7 @@ public final class ErasureCodingPolicyManager {
* @return all policies
*/
public ErasureCodingPolicy[] getPolicies() {
return Stream.concat(SystemErasureCodingPolicies.getPolicies().stream(),
userPoliciesByName.values().stream())
.toArray(ErasureCodingPolicy[]::new);
return allPolicies;
}
/**
@ -197,11 +207,7 @@ public final class ErasureCodingPolicyManager {
* @return ecPolicy, or null if not found
*/
public ErasureCodingPolicy getByID(byte id) {
ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByID(id);
if (policy == null) {
return this.userPoliciesByID.get(id);
}
return policy;
return this.policiesByID.get(id);
}
/**
@ -209,11 +215,7 @@ public final class ErasureCodingPolicyManager {
* @return ecPolicy, or null if not found
*/
public ErasureCodingPolicy getByName(String name) {
ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByName(name);
if (policy == null) {
return this.userPoliciesByName.get(name);
}
return policy;
return this.policiesByName.get(name);
}
/**
@ -230,6 +232,9 @@ public final class ErasureCodingPolicyManager {
*/
public synchronized ErasureCodingPolicy addPolicy(ErasureCodingPolicy policy)
throws IllegalECPolicyException {
// Set policy state into DISABLED when adding into Hadoop.
policy.setState(ErasureCodingPolicyState.DISABLED);
if (!CodecUtil.hasCodec(policy.getCodecName())) {
throw new IllegalECPolicyException("Codec name "
+ policy.getCodecName() + " is not supported");
@ -256,15 +261,17 @@ public final class ErasureCodingPolicyManager {
}
policy.setName(assignedNewName);
policy.setId(getNextAvailablePolicyID());
this.userPoliciesByName.put(policy.getName(), policy);
this.userPoliciesByID.put(policy.getId(), policy);
this.policiesByName.put(policy.getName(), policy);
this.policiesByID.put(policy.getId(), policy);
allPolicies = policiesByName.values().toArray(new ErasureCodingPolicy[0]);
return policy;
}
private byte getNextAvailablePolicyID() {
byte currentId = this.userPoliciesByID.keySet().stream()
.max(Byte::compareTo).orElse(
ErasureCodeConstants.USER_DEFINED_POLICY_START_ID);
byte currentId = this.policiesByID.keySet().stream()
.max(Byte::compareTo)
.filter(id -> id >= ErasureCodeConstants.USER_DEFINED_POLICY_START_ID)
.orElse(ErasureCodeConstants.USER_DEFINED_POLICY_START_ID);
return (byte) (currentId + 1);
}
@ -272,69 +279,71 @@ public final class ErasureCodingPolicyManager {
* Remove an User erasure coding policy by policyName.
*/
public synchronized void removePolicy(String name) {
if (SystemErasureCodingPolicies.getByName(name) != null) {
throw new IllegalArgumentException("System erasure coding policy " +
name + " cannot be removed");
}
ErasureCodingPolicy policy = userPoliciesByName.get(name);
if (policy == null) {
ErasureCodingPolicy ecPolicy = policiesByName.get(name);
if (ecPolicy == null) {
throw new IllegalArgumentException("The policy name " +
name + " does not exists");
}
enabledPoliciesByName.remove(name);
removedPoliciesByName.put(name, policy);
if (ecPolicy.isSystemPolicy()) {
throw new IllegalArgumentException("System erasure coding policy " +
name + " cannot be removed");
}
if (enabledPoliciesByName.containsKey(name)) {
enabledPoliciesByName.remove(name);
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
}
ecPolicy.setState(ErasureCodingPolicyState.REMOVED);
LOG.info("Remove erasure coding policy " + name);
}
@VisibleForTesting
public List<ErasureCodingPolicy> getRemovedPolicies() {
return removedPoliciesByName.values().stream().collect(Collectors.toList());
ArrayList<ErasureCodingPolicy> removedPolicies =
new ArrayList<ErasureCodingPolicy>();
for (ErasureCodingPolicy ecPolicy : policiesByName.values()) {
if (ecPolicy.isRemoved()) {
removedPolicies.add(ecPolicy);
}
}
return removedPolicies;
}
/**
* Disable an erasure coding policy by policyName.
*/
public synchronized void disablePolicy(String name) {
ErasureCodingPolicy sysEcPolicy = SystemErasureCodingPolicies
.getByName(name);
ErasureCodingPolicy userEcPolicy = userPoliciesByName.get(name);
LOG.info("Disable the erasure coding policy " + name);
if (sysEcPolicy == null &&
userEcPolicy == null) {
ErasureCodingPolicy ecPolicy = policiesByName.get(name);
if (ecPolicy == null) {
throw new IllegalArgumentException("The policy name " +
name + " does not exists");
}
if(sysEcPolicy != null){
if (enabledPoliciesByName.containsKey(name)) {
enabledPoliciesByName.remove(name);
removedPoliciesByName.put(name, sysEcPolicy);
}
if(userEcPolicy != null){
enabledPoliciesByName.remove(name);
removedPoliciesByName.put(name, userEcPolicy);
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
}
ecPolicy.setState(ErasureCodingPolicyState.DISABLED);
LOG.info("Disable the erasure coding policy " + name);
}
/**
* Enable an erasure coding policy by policyName.
*/
public synchronized void enablePolicy(String name) {
ErasureCodingPolicy sysEcPolicy = SystemErasureCodingPolicies
.getByName(name);
ErasureCodingPolicy userEcPolicy = userPoliciesByName.get(name);
LOG.info("Enable the erasure coding policy " + name);
if (sysEcPolicy == null &&
userEcPolicy == null) {
ErasureCodingPolicy ecPolicy = policiesByName.get(name);
if (ecPolicy == null) {
throw new IllegalArgumentException("The policy name " +
name + " does not exists");
}
if(sysEcPolicy != null){
enabledPoliciesByName.put(name, sysEcPolicy);
removedPoliciesByName.remove(name);
enabledPoliciesByName.put(name, ecPolicy);
ecPolicy.setState(ErasureCodingPolicyState.ENABLED);
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
LOG.info("Enable the erasure coding policy " + name);
}
if(userEcPolicy != null) {
enabledPoliciesByName.put(name, userEcPolicy);
removedPoliciesByName.remove(name);
}
}
}

View File

@ -17,6 +17,23 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.erasurecode.CodecRegistry;
import org.apache.hadoop.security.AccessControlException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@ -29,24 +46,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.erasurecode.CodecRegistry;
import org.apache.hadoop.security.AccessControlException;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_POLICY;
/**
@ -341,7 +340,7 @@ final class FSDirErasureCodingOp {
static ErasureCodingPolicy[] getErasureCodingPolicies(final FSNamesystem fsn)
throws IOException {
assert fsn.hasReadLock();
return fsn.getErasureCodingPolicyManager().getEnabledPolicies();
return fsn.getErasureCodingPolicyManager().getPolicies();
}
/**

View File

@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -100,9 +99,7 @@ public class ECAdmin extends Configured implements Tool {
@Override
public String getLongUsage() {
return getShortUsage() + "\n" +
"Get the list of enabled erasure coding policies.\n" +
"Policies can be enabled on the NameNode via `" +
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY + "`.\n";
"Get the list of all erasure coding policies.\n";
}
@Override
@ -117,17 +114,13 @@ public class ECAdmin extends Configured implements Tool {
Collection<ErasureCodingPolicy> policies =
dfs.getAllErasureCodingPolicies();
if (policies.isEmpty()) {
System.out.println("No erasure coding policies are enabled on the " +
System.out.println("There is no erasure coding policies in the " +
"cluster.");
System.out.println("The set of enabled policies can be " +
"configured at '" +
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY + "' on the " +
"NameNode.");
} else {
System.out.println("Erasure Coding Policies:");
for (ErasureCodingPolicy policy : policies) {
if (policy != null) {
System.out.println("\t" + policy.getName());
System.out.println(policy.toString());
}
}
}

View File

@ -192,7 +192,7 @@ Below are the details about each command.
* `[-listPolicies]`
Lists the set of enabled erasure coding policies. These names are suitable for use with the `setPolicy` command.
Lists all (enabled, disabled and removed) erasure coding policies registered in HDFS. Only the enabled policies are suitable for use with the `setPolicy` command.
* `[-addPolicies -policyFile <file>]`

View File

@ -1608,8 +1608,6 @@ public class TestDistributedFileSystem {
assertEquals(policyName, ErasureCodingPolicyManager.getInstance().
getByName(policyName).getName());
fs.disableErasureCodingPolicy(policyName);
assertEquals(policyName, ErasureCodingPolicyManager.getInstance().
getRemovedPolicies().get(0).getName());
fs.enableErasureCodingPolicy(policyName);
assertEquals(policyName, ErasureCodingPolicyManager.getInstance().
getByName(policyName).getName());

View File

@ -211,7 +211,8 @@ public class TestErasureCodingPolicies {
// Only default policy should be enabled after restart
Assert.assertEquals("Only default policy should be enabled after restart",
1, fs.getAllErasureCodingPolicies().size());
1,
ErasureCodingPolicyManager.getInstance().getEnabledPolicies().length);
// Already set directory-level policies should still be in effect
Path disabledPolicy = new Path(dir1, "afterDisabled");
@ -383,6 +384,18 @@ public class TestErasureCodingPolicies {
.getAllErasureCodingPolicies();
assertTrue("All system policies should be enabled",
allECPolicies.containsAll(SystemErasureCodingPolicies.getPolicies()));
// Query after add a new policy
ECSchema toAddSchema = new ECSchema("rs", 5, 2);
ErasureCodingPolicy newPolicy =
new ErasureCodingPolicy(toAddSchema, 128 * 1024);
ErasureCodingPolicy[] policyArray = new ErasureCodingPolicy[]{newPolicy};
fs.addErasureCodingPolicies(policyArray);
allECPolicies = fs.getAllErasureCodingPolicies();
assertEquals("Should return new added policy",
SystemErasureCodingPolicies.getPolicies().size() + 1,
allECPolicies.size());
}
@Test

View File

@ -17,15 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.protocol.BlockType.CONTIGUOUS;
import static org.apache.hadoop.hdfs.protocol.BlockType.STRIPED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -42,6 +33,7 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -50,14 +42,23 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import java.io.IOException;
import static org.apache.hadoop.hdfs.protocol.BlockType.CONTIGUOUS;
import static org.apache.hadoop.hdfs.protocol.BlockType.STRIPED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* This class tests INodeFile with striped feature.
*/
@ -88,6 +89,12 @@ public class TestStripedINodeFile {
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void init() {
Configuration conf = new HdfsConfiguration();
ErasureCodingPolicyManager.getInstance().init(conf);
}
@Test
public void testInvalidECPolicy() throws IllegalArgumentException {
thrown.expect(IllegalArgumentException.class);

View File

@ -135,7 +135,7 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Get the list of enabled erasure coding policies</expected-output>
<expected-output>Get the list of all erasure coding policies</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
@ -410,6 +410,25 @@
</comparators>
</test>
<test>
<description>listPolicies : get the list of ECPolicies supported</description>
<test-commands>
<ec-admin-command>-fs NAMENODE -listPolicies</ec-admin-command>
</test-commands>
<cleanup-commands>
</cleanup-commands>
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>XOR-2-1-128k</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>State=DISABLED</expected-output>
</comparator>
</comparators>
</test>
<test>
<description>enablePolicy : enable the erasure coding policy</description>
<test-commands>