HDFS-8156. Add/implement necessary APIs even we just have the system default schema. Contributed by Kai Zheng.

This commit is contained in:
Zhe Zhang 2015-04-22 14:48:54 -07:00 committed by Zhe Zhang
parent 014d8675c5
commit e8df2581c3
10 changed files with 249 additions and 111 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
@ -30,55 +31,80 @@ public final class ECSchema {
public static final String CHUNK_SIZE_KEY = "chunkSize";
public static final int DEFAULT_CHUNK_SIZE = 256 * 1024; // 256K
private String schemaName;
private String codecName;
private Map<String, String> options;
private int numDataUnits;
private int numParityUnits;
private int chunkSize;
/**
* A friendly and understandable name that can mean what's it, also serves as
* the identifier that distinguish it from other schemas.
*/
private final String schemaName;
/**
* Constructor with schema name and provided options. Note the options may
* The erasure codec name associated.
*/
private final String codecName;
/**
* Number of source data units coded
*/
private final int numDataUnits;
/**
* Number of parity units generated in a coding
*/
private final int numParityUnits;
/**
* Unit data size for each chunk in a coding
*/
private final int chunkSize;
/*
* An erasure code can have its own specific advanced parameters, subject to
* itself to interpret these key-value settings.
*/
private final Map<String, String> extraOptions;
/**
* Constructor with schema name and provided all options. Note the options may
* contain additional information for the erasure codec to interpret further.
* @param schemaName schema name
* @param options schema options
* @param allOptions all schema options
*/
public ECSchema(String schemaName, Map<String, String> options) {
public ECSchema(String schemaName, Map<String, String> allOptions) {
assert (schemaName != null && ! schemaName.isEmpty());
this.schemaName = schemaName;
if (options == null || options.isEmpty()) {
if (allOptions == null || allOptions.isEmpty()) {
throw new IllegalArgumentException("No schema options are provided");
}
String codecName = options.get(CODEC_NAME_KEY);
this.codecName = allOptions.get(CODEC_NAME_KEY);
if (codecName == null || codecName.isEmpty()) {
throw new IllegalArgumentException("No codec option is provided");
}
int dataUnits = 0, parityUnits = 0;
try {
if (options.containsKey(NUM_DATA_UNITS_KEY)) {
dataUnits = Integer.parseInt(options.get(NUM_DATA_UNITS_KEY));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Option value " +
options.get(NUM_DATA_UNITS_KEY) + " for " + NUM_DATA_UNITS_KEY +
" is found. It should be an integer");
int tmpNumDataUnits = extractIntOption(NUM_DATA_UNITS_KEY, allOptions);
int tmpNumParityUnits = extractIntOption(NUM_PARITY_UNITS_KEY, allOptions);
if (tmpNumDataUnits < 0 || tmpNumParityUnits < 0) {
throw new IllegalArgumentException(
"No good option for numDataUnits or numParityUnits found ");
}
this.numDataUnits = tmpNumDataUnits;
this.numParityUnits = tmpNumParityUnits;
int tmpChunkSize = extractIntOption(CHUNK_SIZE_KEY, allOptions);
if (tmpChunkSize > 0) {
this.chunkSize = tmpChunkSize;
} else {
this.chunkSize = DEFAULT_CHUNK_SIZE;
}
try {
if (options.containsKey(NUM_PARITY_UNITS_KEY)) {
parityUnits = Integer.parseInt(options.get(NUM_PARITY_UNITS_KEY));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Option value " +
options.get(NUM_PARITY_UNITS_KEY) + " for " + NUM_PARITY_UNITS_KEY +
" is found. It should be an integer");
}
initWith(codecName, dataUnits, parityUnits, options);
allOptions.remove(CODEC_NAME_KEY);
allOptions.remove(NUM_DATA_UNITS_KEY);
allOptions.remove(NUM_PARITY_UNITS_KEY);
allOptions.remove(CHUNK_SIZE_KEY);
// After some cleanup
this.extraOptions = Collections.unmodifiableMap(allOptions);
}
/**
@ -94,48 +120,60 @@ public final class ECSchema {
}
/**
* Constructor with key parameters provided. Note the options may contain
* Constructor with key parameters provided. Note the extraOptions may contain
* additional information for the erasure codec to interpret further.
* @param schemaName
* @param codecName
* @param numDataUnits
* @param numParityUnits
* @param options
* @param extraOptions
*/
public ECSchema(String schemaName, String codecName,
int numDataUnits, int numParityUnits,
Map<String, String> options) {
public ECSchema(String schemaName, String codecName, int numDataUnits,
int numParityUnits, Map<String, String> extraOptions) {
assert (schemaName != null && ! schemaName.isEmpty());
assert (codecName != null && ! codecName.isEmpty());
assert (numDataUnits > 0 && numParityUnits > 0);
this.schemaName = schemaName;
initWith(codecName, numDataUnits, numParityUnits, options);
}
private void initWith(String codecName, int numDataUnits, int numParityUnits,
Map<String, String> options) {
this.codecName = codecName;
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
this.options = options != null ? Collections.unmodifiableMap(options) :
Collections.EMPTY_MAP;
if (extraOptions == null) {
extraOptions = new HashMap<>();
}
int tmpChunkSize = extractIntOption(CHUNK_SIZE_KEY, extraOptions);
if (tmpChunkSize > 0) {
this.chunkSize = tmpChunkSize;
} else {
this.chunkSize = DEFAULT_CHUNK_SIZE;
}
extraOptions.remove(CHUNK_SIZE_KEY);
// After some cleanup
this.extraOptions = Collections.unmodifiableMap(extraOptions);
}
private int extractIntOption(String optionKey, Map<String, String> options) {
int result = -1;
this.chunkSize = DEFAULT_CHUNK_SIZE;
try {
if (this.options.containsKey(CHUNK_SIZE_KEY)) {
this.chunkSize = Integer.parseInt(options.get(CHUNK_SIZE_KEY));
if (options.containsKey(optionKey)) {
result = Integer.parseInt(options.get(optionKey));
if (result <= 0) {
throw new IllegalArgumentException("Bad option value " + result +
" found for " + optionKey);
}
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Option value " +
this.options.get(CHUNK_SIZE_KEY) + " for " + CHUNK_SIZE_KEY +
options.get(optionKey) + " for " + optionKey +
" is found. It should be an integer");
}
boolean isFine = numDataUnits > 0 && numParityUnits > 0 && chunkSize > 0;
if (! isFine) {
throw new IllegalArgumentException("Bad codec options are found");
}
return result;
}
/**
@ -155,11 +193,11 @@ public final class ECSchema {
}
/**
* Get erasure coding options
* @return encoding options
* Get extra options specific to a erasure code.
* @return extra options
*/
public Map<String, String> getOptions() {
return options;
public Map<String, String> getExtraOptions() {
return extraOptions;
}
/**
@ -194,18 +232,17 @@ public final class ECSchema {
public String toString() {
StringBuilder sb = new StringBuilder("ECSchema=[");
sb.append("Name=" + schemaName + ",");
sb.append(NUM_DATA_UNITS_KEY + "=" + numDataUnits + ",");
sb.append(NUM_PARITY_UNITS_KEY + "=" + numParityUnits + ",");
sb.append(CHUNK_SIZE_KEY + "=" + chunkSize + ",");
sb.append("Name=" + schemaName + ", ");
sb.append("Codec=" + codecName + ", ");
sb.append(NUM_DATA_UNITS_KEY + "=" + numDataUnits + ", ");
sb.append(NUM_PARITY_UNITS_KEY + "=" + numParityUnits + ", ");
sb.append(CHUNK_SIZE_KEY + "=" + chunkSize +
(extraOptions.isEmpty() ? "" : ", "));
for (String opt : options.keySet()) {
boolean skip = (opt.equals(NUM_DATA_UNITS_KEY) ||
opt.equals(NUM_PARITY_UNITS_KEY) ||
opt.equals(CHUNK_SIZE_KEY));
if (! skip) {
sb.append(opt + "=" + options.get(opt) + ",");
}
int i = 0;
for (String opt : extraOptions.keySet()) {
sb.append(opt + "=" + extraOptions.get(opt) +
(++i < extraOptions.size() ? ", " : ""));
}
sb.append("]");
@ -239,14 +276,14 @@ public final class ECSchema {
if (!codecName.equals(ecSchema.codecName)) {
return false;
}
return options.equals(ecSchema.options);
return extraOptions.equals(ecSchema.extraOptions);
}
@Override
public int hashCode() {
int result = schemaName.hashCode();
result = 31 * result + codecName.hashCode();
result = 31 * result + options.hashCode();
result = 31 * result + extraOptions.hashCode();
result = 31 * result + numDataUnits;
result = 31 * result + numParityUnits;
result = 31 * result + chunkSize;

View File

@ -49,6 +49,6 @@ public class TestECSchema {
assertEquals(numParityUnits, schema.getNumParityUnits());
assertEquals(chunkSize, schema.getChunkSize());
assertEquals(codec, schema.getCodecName());
assertEquals(extraOptionValue, schema.getOptions().get(extraOption));
assertEquals(extraOptionValue, schema.getExtraOptions().get(extraOption));
}
}

View File

@ -59,16 +59,16 @@ public class TestSchemaLoader {
ECSchema schema1 = schemas.get(0);
assertEquals("RSk6m3", schema1.getSchemaName());
assertEquals(3, schema1.getOptions().size());
assertEquals(0, schema1.getExtraOptions().size());
assertEquals(6, schema1.getNumDataUnits());
assertEquals(3, schema1.getNumParityUnits());
assertEquals("RS", schema1.getCodecName());
ECSchema schema2 = schemas.get(1);
assertEquals("RSk10m4", schema2.getSchemaName());
assertEquals(3, schema2.getOptions().size());
assertEquals(0, schema2.getExtraOptions().size());
assertEquals(10, schema2.getNumDataUnits());
assertEquals(4, schema2.getNumParityUnits());
assertEquals("RS", schema2.getCodecName());
}
}
}

View File

@ -116,3 +116,6 @@
HDFS-8024. Erasure Coding: ECworker frame, basics, bootstraping and configuration.
(umamahesh)
HDFS-8156. Add/implement necessary APIs even we just have the system default
schema. (Kai Zheng via Zhe Zhang)

View File

@ -3143,7 +3143,7 @@ public class PBHelper {
.setCodecName(schema.getCodecName())
.setDataUnits(schema.getNumDataUnits())
.setParityUnits(schema.getNumParityUnits());
Set<Entry<String, String>> entrySet = schema.getOptions().entrySet();
Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
for (Entry<String, String> entry : entrySet) {
builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
.setKey(entry.getKey()).setValue(entry.getValue()).build());

View File

@ -20,22 +20,62 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECSchema;
import java.util.Map;
import java.util.TreeMap;
/**
* This manages EC schemas predefined and activated in the system. It loads from
* predefined ones in XML and syncs with persisted ones in NameNode image.
* This manages EC schemas predefined and activated in the system.
* It loads customized schemas and syncs with persisted ones in
* NameNode image.
*
* This class is instantiated by the FSNamesystem.
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
public final class ECSchemaManager {
/**
* TODO: HDFS-8095
*/
private static final int DEFAULT_DATA_BLOCKS = 6;
private static final int DEFAULT_PARITY_BLOCKS = 3;
private static final String DEFAULT_CODEC_NAME = "rs";
private static final String DEFAULT_SCHEMA_NAME = "SYS-DEFAULT-RS-6-3";
private static final String DEFAULT_SCHEMA_NAME = "RS-6-3";
private static final ECSchema SYS_DEFAULT_SCHEMA =
new ECSchema(DEFAULT_SCHEMA_NAME,
DEFAULT_CODEC_NAME, DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS);
private static ECSchema SYS_DEFAULT_SCHEMA = new ECSchema(DEFAULT_SCHEMA_NAME,
DEFAULT_CODEC_NAME, DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS);
//We may add more later.
private static ECSchema[] SYS_SCHEMAS = new ECSchema[] {
SYS_DEFAULT_SCHEMA
};
/**
* All active EC activeSchemas maintained in NN memory for fast querying,
* identified and sorted by its name.
*/
private final Map<String, ECSchema> activeSchemas;
ECSchemaManager() {
this.activeSchemas = new TreeMap<String, ECSchema>();
for (ECSchema schema : SYS_SCHEMAS) {
activeSchemas.put(schema.getSchemaName(), schema);
}
/**
* TODO: HDFS-7859 persist into NameNode
* load persistent schemas from image and editlog, which is done only once
* during NameNode startup. This can be done here or in a separate method.
*/
}
/**
* Get system defined schemas.
* @return system schemas
*/
public static ECSchema[] getSystemSchemas() {
return SYS_SCHEMAS;
}
/**
* Get system-wide default EC schema, which can be used by default when no
@ -56,7 +96,32 @@ public final class ECSchemaManager {
throw new IllegalArgumentException("Invalid schema parameter");
}
// schema name is the identifier, but for safety we check all properties.
return SYS_DEFAULT_SCHEMA.equals(schema);
// schema name is the identifier.
return SYS_DEFAULT_SCHEMA.getSchemaName().equals(schema.getSchemaName());
}
/**
* Get all EC schemas that's available to use.
* @return all EC schemas
*/
public ECSchema[] getSchemas() {
ECSchema[] results = new ECSchema[activeSchemas.size()];
return activeSchemas.values().toArray(results);
}
/**
* Get the EC schema specified by the schema name.
* @param schemaName
* @return EC schema specified by the schema name
*/
public ECSchema getSchema(String schemaName) {
return activeSchemas.get(schemaName);
}
/**
* Clear and clean up
*/
public void clear() {
activeSchemas.clear();
}
}

View File

@ -23,8 +23,6 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.ECZoneInfo;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.io.erasurecode.ECSchema;
import java.io.IOException;
@ -80,9 +78,8 @@ public class ErasureCodingZoneManager {
: inode.getXAttrFeature().getXAttrs();
for (XAttr xAttr : xAttrs) {
if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
ECSchemaProto ecSchemaProto;
ecSchemaProto = ECSchemaProto.parseFrom(xAttr.getValue());
ECSchema schema = PBHelper.convertECSchema(ecSchemaProto);
String schemaName = new String(xAttr.getValue());
ECSchema schema = dir.getFSNamesystem().getECSchema(schemaName);
return new ECZoneInfo(inode.getFullPathName(), schema);
}
}
@ -109,13 +106,14 @@ public class ErasureCodingZoneManager {
throw new IOException("Directory " + src + " is already in an " +
"erasure coding zone.");
}
// TODO HDFS-7859 Need to persist the schema in xattr in efficient way
// As of now storing the protobuf format
// System default schema will be used since no specified.
if (schema == null) {
schema = ECSchemaManager.getSystemDefaultSchema();
}
ECSchemaProto schemaProto = PBHelper.convertECSchema(schema);
byte[] schemaBytes = schemaProto.toByteArray();
// Now persist the schema name in xattr
byte[] schemaBytes = schema.getSchemaName().getBytes();
final XAttr ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
schemaBytes);
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);

View File

@ -428,6 +428,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final BlockManager blockManager;
private final SnapshotManager snapshotManager;
private final CacheManager cacheManager;
private final ECSchemaManager schemaManager;
private final DatanodeStatistics datanodeStatistics;
private String nameserviceId;
@ -607,6 +608,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
leaseManager.removeAllLeases();
snapshotManager.clearSnapshottableDirs();
cacheManager.clear();
schemaManager.clear();
setImageLoaded(false);
blockManager.clear();
}
@ -846,6 +848,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
this.dir = new FSDirectory(this, conf);
this.snapshotManager = new SnapshotManager(dir);
this.cacheManager = new CacheManager(this, conf, blockManager);
this.schemaManager = new ECSchemaManager();
this.safeMode = new SafeModeInfo(conf);
this.topConf = new TopConf(conf);
this.auditLoggers = initAuditLoggers(conf);
@ -6616,16 +6619,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
public FSDirectory getFSDirectory() {
return dir;
}
/** Set the FSDirectory. */
@VisibleForTesting
public void setFSDirectory(FSDirectory dir) {
this.dir = dir;
}
/** @return the cache manager. */
public CacheManager getCacheManager() {
return cacheManager;
}
/** @return the schema manager. */
public ECSchemaManager getSchemaManager() {
return schemaManager;
}
@Override // NameNodeMXBean
public String getCorruptFiles() {
List<String> list = new ArrayList<String>();
@ -7626,9 +7636,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
readLock();
try {
checkOperation(OperationCategory.READ);
// TODO HDFS-7866 Need to return all schemas maintained by Namenode
ECSchema defaultSchema = ECSchemaManager.getSystemDefaultSchema();
return new ECSchema[] { defaultSchema };
return schemaManager.getSchemas();
} finally {
readUnlock();
}
}
/**
* Get the ECSchema specified by the name
*/
ECSchema getECSchema(String schemaName) throws IOException {
checkOperation(OperationCategory.READ);
waitForLoadingFSImage();
readLock();
try {
checkOperation(OperationCategory.READ);
return schemaManager.getSchema(schemaName);
} finally {
readUnlock();
}

View File

@ -48,10 +48,7 @@ public class TestECSchemas {
@Test
public void testGetECSchemas() throws Exception {
ECSchema[] ecSchemas = cluster.getFileSystem().getClient().getECSchemas();
// TODO update assertion after HDFS-7866
assertNotNull(ecSchemas);
assertEquals("Should have only one ecSchema", 1, ecSchemas.length);
assertEquals("Returned schemas should have only default schema",
ECSchemaManager.getSystemDefaultSchema(), ecSchemas[0]);
assertTrue("Should have at least one schema", ecSchemas.length > 0);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.io.erasurecode.ECSchema;
@ -151,35 +152,49 @@ public class TestErasureCodingZones {
}
@Test
public void testGetErasureCodingInfo() throws Exception {
public void testGetErasureCodingInfoWithSystemDefaultSchema() throws Exception {
String src = "/ec";
final Path ecDir = new Path(src);
fs.mkdir(ecDir, FsPermission.getDirDefault());
// dir ECInfo before creating ec zone
assertNull(fs.getClient().getErasureCodingInfo(src));
// dir ECInfo after creating ec zone
fs.getClient().createErasureCodingZone(src, null);
verifyErasureCodingInfo(src);
fs.getClient().createErasureCodingZone(src, null); //Default one will be used.
ECSchema sysDefaultSchema = ECSchemaManager.getSystemDefaultSchema();
verifyErasureCodingInfo(src, sysDefaultSchema);
fs.create(new Path(ecDir, "/child1")).close();
// verify for the files in ec zone
verifyErasureCodingInfo(src + "/child1");
verifyErasureCodingInfo(src + "/child1", sysDefaultSchema);
}
private void verifyErasureCodingInfo(String src) throws IOException {
@Test
public void testGetErasureCodingInfo() throws Exception {
ECSchema[] sysSchemas = ECSchemaManager.getSystemSchemas();
assertTrue("System schemas should be of only 1 for now",
sysSchemas.length == 1);
ECSchema usingSchema = sysSchemas[0];
String src = "/ec2";
final Path ecDir = new Path(src);
fs.mkdir(ecDir, FsPermission.getDirDefault());
// dir ECInfo before creating ec zone
assertNull(fs.getClient().getErasureCodingInfo(src));
// dir ECInfo after creating ec zone
fs.getClient().createErasureCodingZone(src, usingSchema);
verifyErasureCodingInfo(src, usingSchema);
fs.create(new Path(ecDir, "/child1")).close();
// verify for the files in ec zone
verifyErasureCodingInfo(src + "/child1", usingSchema);
}
private void verifyErasureCodingInfo(
String src, ECSchema usingSchema) throws IOException {
ECInfo ecInfo = fs.getClient().getErasureCodingInfo(src);
assertNotNull("ECInfo should have been non-null", ecInfo);
assertEquals(src, ecInfo.getSrc());
ECSchema schema = ecInfo.getSchema();
assertNotNull(schema);
assertEquals("Default schema should be returned", "RS-6-3",
schema.getSchemaName());
assertEquals("Default codec(rs) should be returned", "rs",
schema.getCodecName());
assertEquals("Default numDataUnits should be used", 6,
schema.getNumDataUnits());
assertEquals("Default numParityUnits should be used", 3,
schema.getNumParityUnits());
assertEquals("Default chunkSize should be used",
ECSchema.DEFAULT_CHUNK_SIZE, schema.getChunkSize());
assertEquals("Actually used schema should be equal with target schema",
usingSchema, schema);
}
}