HDFS-8024. Erasure Coding: ECworker frame, basics, bootstraping and configuration. (Contributed by Uma Maheswara Rao G)

This commit is contained in:
Uma Maheswara Rao G 2015-04-22 19:30:14 +05:30 committed by Zhe Zhang
parent 2f11109bb0
commit 014d8675c5
7 changed files with 112 additions and 1 deletions

View File

@ -66,7 +66,7 @@ protected RawErasureEncoder createRawEncoder(String rawCoderFactoryKey) {
* @param isEncoder * @param isEncoder
* @return raw coder * @return raw coder
*/ */
protected static RawErasureCoder createRawCoder(Configuration conf, public static RawErasureCoder createRawCoder(Configuration conf,
String rawCoderFactoryKey, boolean isEncoder) { String rawCoderFactoryKey, boolean isEncoder) {
if (conf == null) { if (conf == null) {

View File

@ -113,3 +113,6 @@
HDFS-8212. DistributedFileSystem.createErasureCodingZone should pass schema HDFS-8212. DistributedFileSystem.createErasureCodingZone should pass schema
in FileSystemLinkResolver. (szetszwo via Zhe Zhang) in FileSystemLinkResolver. (szetszwo via Zhe Zhang)
HDFS-8024. Erasure Coding: ECworker frame, basics, bootstraping and configuration.
(umamahesh)

View File

@ -973,6 +973,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) {
return REG_CMD; return REG_CMD;
case BlockIdCommand: case BlockIdCommand:
return PBHelper.convert(proto.getBlkIdCmd()); return PBHelper.convert(proto.getBlkIdCmd());
case BlockECRecoveryCommand:
return PBHelper.convert(proto.getBlkECRecoveryCmd());
default: default:
return null; return null;
} }
@ -1123,6 +1125,11 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand). builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand)); setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
break; break;
case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
builder.setCmdType(DatanodeCommandProto.Type.BlockECRecoveryCommand)
.setBlkECRecoveryCmd(
convert((BlockECRecoveryCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default: default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);

View File

@ -32,11 +32,13 @@
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -722,6 +724,10 @@ assert getBlockPoolId().equals(bp) :
dxcs.balanceThrottler.setBandwidth(bandwidth); dxcs.balanceThrottler.setBandwidth(bandwidth);
} }
break; break;
case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");
Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
default: default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
} }

View File

@ -87,6 +87,7 @@
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -152,6 +153,7 @@
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@ -354,6 +356,8 @@ public static InetSocketAddress createSocketAddr(String target) {
private String dnUserName = null; private String dnUserName = null;
private SpanReceiverHost spanReceiverHost; private SpanReceiverHost spanReceiverHost;
private ErasureCodingWorker ecWorker;
private static final int NUM_CORES = Runtime.getRuntime() private static final int NUM_CORES = Runtime.getRuntime()
.availableProcessors(); .availableProcessors();
private static final double CONGESTION_RATIO = 1.5; private static final double CONGESTION_RATIO = 1.5;
@ -1157,6 +1161,7 @@ void startDataNode(Configuration conf,
saslClient = new SaslDataTransferClient(dnConf.conf, saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver); dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
ecWorker = new ErasureCodingWorker(conf); // Initialize ErasureCoding worker
} }
/** /**
@ -3258,4 +3263,9 @@ public void removeSpanReceiver(long id) throws IOException {
checkSuperuserPrivilege(); checkSuperuserPrivilege();
spanReceiverHost.removeSpanReceiver(id); spanReceiverHost.removeSpanReceiver(id);
} }
public ErasureCodingWorker getErasureCodingWorker(){
return ecWorker;
}
} }

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.io.erasurecode.coder.AbstractErasureCoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
/**
* ErasureCodingWorker handles the erasure coding recovery work commands. These
* commands would be issued from Namenode as part of Datanode's heart beat
* response. BPOfferService delegates the work to this class for handling EC
* commands.
*/
public final class ErasureCodingWorker {
private Configuration conf;
RawErasureCoder rawEncoder = null;
RawErasureCoder rawDecoder = null;
public ErasureCodingWorker(Configuration conf) {
this.conf = conf;
initialize();
}
/**
* Initializes the required resources for handling the erasure coding recovery
* work.
*/
public void initialize() {
// Right now directly used RS coder. Once other coders integration ready, we
// can load preferred codec here.
initializeErasureEncoder();
initializeErasureDecoder();
}
private void initializeErasureDecoder() {
rawDecoder = AbstractErasureCoder.createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, false);
if (rawDecoder == null) {
rawDecoder = new RSRawDecoder();
}
}
private void initializeErasureEncoder() {
rawEncoder = AbstractErasureCoder.createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, true);
if (rawEncoder == null) {
rawEncoder = new RSRawEncoder();
}
}
/**
* Handles the Erasure Coding recovery work commands.
*
* @param ecTasks
* BlockECRecoveryInfo
*/
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
// HDFS-7348 : Implement the actual recovery process
}
}

View File

@ -59,6 +59,7 @@ message DatanodeCommandProto {
UnusedUpgradeCommand = 6; UnusedUpgradeCommand = 6;
NullDatanodeCommand = 7; NullDatanodeCommand = 7;
BlockIdCommand = 8; BlockIdCommand = 8;
BlockECRecoveryCommand = 9;
} }
required Type cmdType = 1; // Type of the command required Type cmdType = 1; // Type of the command
@ -72,6 +73,7 @@ message DatanodeCommandProto {
optional KeyUpdateCommandProto keyUpdateCmd = 6; optional KeyUpdateCommandProto keyUpdateCmd = 6;
optional RegisterCommandProto registerCmd = 7; optional RegisterCommandProto registerCmd = 7;
optional BlockIdCommandProto blkIdCmd = 8; optional BlockIdCommandProto blkIdCmd = 8;
optional BlockECRecoveryCommandProto blkECRecoveryCmd = 9;
} }
/** /**