HDFS-16655. OIV: print out erasure coding policy name in oiv Delimited output (#4541). Contributed by Max Xie.
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
8f83d9f56d
commit
2f49eec5dd
|
@ -80,6 +80,7 @@ public class OfflineImageViewerPB {
|
|||
+ " delimiter. The default delimiter is \\t, though this may be\n"
|
||||
+ " changed via the -delimiter argument.\n"
|
||||
+ " -sp print storage policy, used by delimiter only.\n"
|
||||
+ " -ec print erasure coding policy, used by delimiter only.\n"
|
||||
+ " * DetectCorruption: Detect potential corruption of the image by\n"
|
||||
+ " selectively loading parts of it and actively searching for\n"
|
||||
+ " inconsistencies. Outputs a summary of the found corruptions\n"
|
||||
|
@ -132,6 +133,7 @@ public class OfflineImageViewerPB {
|
|||
options.addOption("addr", true, "");
|
||||
options.addOption("delimiter", true, "");
|
||||
options.addOption("sp", false, "");
|
||||
options.addOption("ec", false, "");
|
||||
options.addOption("t", "temp", true, "");
|
||||
options.addOption("m", "multiThread", true, "");
|
||||
|
||||
|
@ -228,9 +230,11 @@ public class OfflineImageViewerPB {
|
|||
break;
|
||||
case "DELIMITED":
|
||||
boolean printStoragePolicy = cmd.hasOption("sp");
|
||||
boolean printECPolicy = cmd.hasOption("ec");
|
||||
try (PBImageDelimitedTextWriter writer =
|
||||
new PBImageDelimitedTextWriter(out, delimiter,
|
||||
tempPath, printStoragePolicy, threads, outputFile)) {
|
||||
tempPath, printStoragePolicy, printECPolicy, threads,
|
||||
outputFile, conf)) {
|
||||
writer.visit(inputFile);
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeFile;
|
||||
|
@ -46,6 +49,8 @@ import java.text.SimpleDateFormat;
|
|||
public class PBImageDelimitedTextWriter extends PBImageTextWriter {
|
||||
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm";
|
||||
private boolean printStoragePolicy;
|
||||
private boolean printECPolicy;
|
||||
private ErasureCodingPolicyManager ecManager;
|
||||
|
||||
static class OutputEntryBuilder {
|
||||
private final SimpleDateFormat dateFormatter =
|
||||
|
@ -62,6 +67,7 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
|
|||
private long nsQuota = 0;
|
||||
private long dsQuota = 0;
|
||||
private int storagePolicy = 0;
|
||||
private String ecPolicy = "-";
|
||||
|
||||
private String dirPermission = "-";
|
||||
private PermissionStatus permissionStatus;
|
||||
|
@ -83,6 +89,13 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
|
|||
aclPermission = "+";
|
||||
}
|
||||
storagePolicy = file.getStoragePolicyID();
|
||||
if (writer.printECPolicy && file.hasErasureCodingPolicyID()) {
|
||||
ErasureCodingPolicy policy = writer.ecManager.
|
||||
getByID((byte) file.getErasureCodingPolicyID());
|
||||
if (policy != null) {
|
||||
ecPolicy = policy.getName();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case DIRECTORY:
|
||||
INodeDirectory dir = inode.getDirectory();
|
||||
|
@ -95,6 +108,12 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
|
|||
aclPermission = "+";
|
||||
}
|
||||
storagePolicy = writer.getStoragePolicy(dir.getXAttrs());
|
||||
if (writer.printECPolicy) {
|
||||
String name= writer.getErasureCodingPolicyName(dir.getXAttrs());
|
||||
if (name != null) {
|
||||
ecPolicy = name;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case SYMLINK:
|
||||
INodeSymlink s = inode.getSymlink();
|
||||
|
@ -134,6 +153,9 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
|
|||
if (writer.printStoragePolicy) {
|
||||
writer.append(buffer, storagePolicy);
|
||||
}
|
||||
if (writer.printECPolicy) {
|
||||
writer.append(buffer, ecPolicy);
|
||||
}
|
||||
return buffer.substring(1);
|
||||
}
|
||||
}
|
||||
|
@ -146,14 +168,21 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
|
|||
PBImageDelimitedTextWriter(PrintStream out, String delimiter,
|
||||
String tempPath, boolean printStoragePolicy)
|
||||
throws IOException {
|
||||
this(out, delimiter, tempPath, printStoragePolicy, 1, "-");
|
||||
this(out, delimiter, tempPath, printStoragePolicy, false, 1, "-", null);
|
||||
}
|
||||
|
||||
PBImageDelimitedTextWriter(PrintStream out, String delimiter,
|
||||
String tempPath, boolean printStoragePolicy, int threads,
|
||||
String parallelOut) throws IOException {
|
||||
String tempPath, boolean printStoragePolicy,
|
||||
boolean printECPolicy, int threads,
|
||||
String parallelOut, Configuration conf)
|
||||
throws IOException {
|
||||
super(out, delimiter, tempPath, threads, parallelOut);
|
||||
this.printStoragePolicy = printStoragePolicy;
|
||||
if (printECPolicy && conf != null) {
|
||||
this.printECPolicy = true;
|
||||
ecManager = ErasureCodingPolicyManager.getInstance();
|
||||
ecManager.init(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -187,6 +216,9 @@ public class PBImageDelimitedTextWriter extends PBImageTextWriter {
|
|||
if (printStoragePolicy) {
|
||||
append(buffer, "StoragePolicyId");
|
||||
}
|
||||
if (printECPolicy) {
|
||||
append(buffer, "ErasureCodingPolicy");
|
||||
}
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,8 @@ import java.io.InputStream;
|
|||
import java.io.PrintStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
|
@ -63,6 +65,7 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
|
|||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SerialNumberManager;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.util.LimitInputStream;
|
||||
import org.apache.hadoop.util.Lists;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -77,6 +80,8 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_POLICY;
|
||||
|
||||
/**
|
||||
* This class reads the protobuf-based fsimage and generates text output
|
||||
* for each inode to {@link PBImageTextWriter#out}. The sub-class can override
|
||||
|
@ -1029,4 +1034,23 @@ abstract class PBImageTextWriter implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String getErasureCodingPolicyName
|
||||
(INodeSection.XAttrFeatureProto xattrFeatureProto) {
|
||||
List<XAttr> xattrs =
|
||||
FSImageFormatPBINode.Loader.loadXAttrs(xattrFeatureProto, stringTable);
|
||||
for (XAttr xattr : xattrs) {
|
||||
if (XATTR_ERASURECODING_POLICY.contains(xattr.getName())){
|
||||
try{
|
||||
ByteArrayInputStream bIn = new ByteArrayInputStream(xattr.getValue());
|
||||
DataInputStream dIn = new DataInputStream(bIn);
|
||||
return WritableUtils.readString(dIn);
|
||||
} catch (IOException ioException){
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Tests OfflineImageViewer if the input fsimage has HDFS ErasureCodingPolicy
|
||||
* entries.
|
||||
*/
|
||||
public class TestOfflineImageViewerForErasureCodingPolicy {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestOfflineImageViewerForErasureCodingPolicy.class);
|
||||
|
||||
private static File originalFsimage = null;
|
||||
private static File tempDir;
|
||||
|
||||
/**
|
||||
* Create a populated namespace for later testing. Save its contents to a
|
||||
* data structure and store its fsimage location.
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void createOriginalFSImage() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
|
||||
|
||||
File[] nnDirs = MiniDFSCluster.getNameNodeDirectory(
|
||||
MiniDFSCluster.getBaseDirectory(), 0, 0);
|
||||
tempDir = nnDirs[0];
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem hdfs = cluster.getFileSystem();
|
||||
|
||||
hdfs.enableErasureCodingPolicy("RS-6-3-1024k");
|
||||
hdfs.enableErasureCodingPolicy("RS-3-2-1024k");
|
||||
|
||||
Path dir = new Path("/dir_wo_ec_rs63");
|
||||
hdfs.mkdirs(dir);
|
||||
hdfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
|
||||
|
||||
dir = new Path("/dir_wo_ec_rs63/sub_dir_1");
|
||||
hdfs.mkdirs(dir);
|
||||
|
||||
dir = new Path("/dir_wo_ec_rs63/sub_dir_2");
|
||||
hdfs.mkdirs(dir);
|
||||
|
||||
Path file = new Path("/dir_wo_ec_rs63/file_wo_ec_1");
|
||||
try (FSDataOutputStream o = hdfs.create(file)) {
|
||||
o.write(123);
|
||||
}
|
||||
|
||||
file = new Path("/dir_wo_ec_rs63/file_wo_ec_2");
|
||||
try (FSDataOutputStream o = hdfs.create(file)) {
|
||||
o.write(123);
|
||||
}
|
||||
|
||||
dir = new Path("/dir_wo_ec_rs32");
|
||||
hdfs.mkdirs(dir);
|
||||
hdfs.setErasureCodingPolicy(dir, "RS-3-2-1024k");
|
||||
|
||||
dir = new Path("/dir_wo_ec_rs32/sub_dir_1");
|
||||
hdfs.mkdirs(dir);
|
||||
|
||||
file = new Path("/dir_wo_ec_rs32/file_wo_ec");
|
||||
try (FSDataOutputStream o = hdfs.create(file)) {
|
||||
o.write(123);
|
||||
}
|
||||
|
||||
dir = new Path("/dir_wo_rep");
|
||||
hdfs.mkdirs(dir);
|
||||
|
||||
dir = new Path("/dir_wo_rep/sub_dir_1");
|
||||
hdfs.mkdirs(dir);
|
||||
|
||||
file = new Path("/dir_wo_rep/file_rep");
|
||||
try (FSDataOutputStream o = hdfs.create(file)) {
|
||||
o.write(123);
|
||||
}
|
||||
|
||||
// Write results to the fsimage file
|
||||
hdfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER, false);
|
||||
hdfs.saveNamespace();
|
||||
|
||||
// Determine the location of the fsimage file
|
||||
originalFsimage = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
|
||||
.getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0));
|
||||
if (originalFsimage == null) {
|
||||
throw new RuntimeException("Didn't generate or can't find fsimage");
|
||||
}
|
||||
LOG.debug("original FS image file is " + originalFsimage);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void deleteOriginalFSImage() throws IOException {
|
||||
if (originalFsimage != null && originalFsimage.exists()) {
|
||||
originalFsimage.delete();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPBDelimitedWriterForErasureCodingPolicy() throws Exception {
|
||||
String expected = DFSTestUtil.readResoucePlainFile(
|
||||
"testErasureCodingPolicy.csv");
|
||||
String result = readECPolicyFromFsimageFile();
|
||||
assertEquals(expected, result);
|
||||
}
|
||||
|
||||
private String readECPolicyFromFsimageFile() throws Exception {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
String delemiter = "\t";
|
||||
|
||||
File delimitedOutput = new File(tempDir, "delimitedOutput");
|
||||
|
||||
if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
|
||||
"-i", originalFsimage.getAbsolutePath(),
|
||||
"-o", delimitedOutput.getAbsolutePath(),
|
||||
"-ec"}) != 0) {
|
||||
throw new IOException("oiv returned failure creating " +
|
||||
"delimited output with ec.");
|
||||
}
|
||||
|
||||
try (InputStream input = new FileInputStream(delimitedOutput);
|
||||
BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(input))) {
|
||||
String line;
|
||||
boolean header = true;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
String[] fields = line.split(delemiter);
|
||||
if (!header) {
|
||||
String path = fields[0];
|
||||
String ecPolicy = fields[12];
|
||||
builder.append(path).append(",").append(ecPolicy).append("\n");
|
||||
}
|
||||
header = false;
|
||||
}
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
#dir,erasure coding policy
|
||||
/,-
|
||||
/dir_wo_ec_rs63,RS-6-3-1024k
|
||||
/dir_wo_ec_rs63/sub_dir_1,-
|
||||
/dir_wo_ec_rs63/sub_dir_2,-
|
||||
/dir_wo_ec_rs63/file_wo_ec_1,RS-6-3-1024k
|
||||
/dir_wo_ec_rs63/file_wo_ec_2,RS-6-3-1024k
|
||||
/dir_wo_ec_rs32,RS-3-2-1024k
|
||||
/dir_wo_ec_rs32/sub_dir_1,-
|
||||
/dir_wo_ec_rs32/file_wo_ec,RS-3-2-1024k
|
||||
/dir_wo_rep,-
|
||||
/dir_wo_rep/sub_dir_1,-
|
||||
/dir_wo_rep/file_rep,-
|
Can't render this file because it contains an unexpected character in line 5 and column 8.
|
Loading…
Reference in New Issue