diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java index e5bf11a2ac6..7403e35c946 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java @@ -66,7 +66,7 @@ protected RawErasureEncoder createRawEncoder(String rawCoderFactoryKey) { * @param isEncoder * @return raw coder */ - protected static RawErasureCoder createRawCoder(Configuration conf, + public static RawErasureCoder createRawCoder(Configuration conf, String rawCoderFactoryKey, boolean isEncoder) { if (conf == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 3d86f05336e..1acde418117 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -113,3 +113,6 @@ HDFS-8212. DistributedFileSystem.createErasureCodingZone should pass schema in FileSystemLinkResolver. (szetszwo via Zhe Zhang) + + HDFS-8024. Erasure Coding: ECworker frame, basics, bootstraping and configuration. + (umamahesh) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index c127b5f5c1f..68cfe7f57d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -973,6 +973,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) { return REG_CMD; case BlockIdCommand: return PBHelper.convert(proto.getBlkIdCmd()); + case BlockECRecoveryCommand: + return PBHelper.convert(proto.getBlkECRecoveryCmd()); default: return null; } @@ -1123,6 +1125,11 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand). setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand)); break; + case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + builder.setCmdType(DatanodeCommandProto.Type.BlockECRecoveryCommand) + .setBlkECRecoveryCmd( + convert((BlockECRecoveryCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 92323f1530b..69baac7ec36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -32,11 +32,13 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -722,6 +724,10 @@ assert getBlockPoolId().equals(bp) : dxcs.balanceThrottler.setBandwidth(bandwidth); } break; + case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY"); + Collection ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks(); + dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d2b293957df..221ba386ac1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -87,6 +87,7 @@ import javax.management.ObjectName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -152,6 +153,7 @@ import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; @@ -354,6 +356,8 @@ public static InetSocketAddress createSocketAddr(String target) { private String dnUserName = null; private SpanReceiverHost spanReceiverHost; + + private ErasureCodingWorker ecWorker; private static final int NUM_CORES = Runtime.getRuntime() .availableProcessors(); private static final double CONGESTION_RATIO = 1.5; @@ -1157,6 +1161,7 @@ void startDataNode(Configuration conf, saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); + ecWorker = new ErasureCodingWorker(conf); // Initialize ErasureCoding worker } /** @@ -3258,4 +3263,9 @@ public void removeSpanReceiver(long id) throws IOException { checkSuperuserPrivilege(); spanReceiverHost.removeSpanReceiver(id); } + + public ErasureCodingWorker getErasureCodingWorker(){ + return ecWorker; + + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java new file mode 100644 index 00000000000..6430308e7d5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.io.erasurecode.coder.AbstractErasureCoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder; + +/** + * ErasureCodingWorker handles the erasure coding recovery work commands. These + * commands would be issued from Namenode as part of Datanode's heart beat + * response. BPOfferService delegates the work to this class for handling EC + * commands. + */ +public final class ErasureCodingWorker { + + private Configuration conf; + RawErasureCoder rawEncoder = null; + RawErasureCoder rawDecoder = null; + + public ErasureCodingWorker(Configuration conf) { + this.conf = conf; + initialize(); + } + + /** + * Initializes the required resources for handling the erasure coding recovery + * work. + */ + public void initialize() { + // Right now directly used RS coder. Once other coders integration ready, we + // can load preferred codec here. + initializeErasureEncoder(); + initializeErasureDecoder(); + } + + private void initializeErasureDecoder() { + rawDecoder = AbstractErasureCoder.createRawCoder(conf, + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, false); + if (rawDecoder == null) { + rawDecoder = new RSRawDecoder(); + } + } + + private void initializeErasureEncoder() { + rawEncoder = AbstractErasureCoder.createRawCoder(conf, + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, true); + if (rawEncoder == null) { + rawEncoder = new RSRawEncoder(); + } + } + + /** + * Handles the Erasure Coding recovery work commands. + * + * @param ecTasks + * BlockECRecoveryInfo + */ + public void processErasureCodingTasks(Collection ecTasks) { + // HDFS-7348 : Implement the actual recovery process + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index ac9ab460bdf..482e2e9b7bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -59,6 +59,7 @@ message DatanodeCommandProto { UnusedUpgradeCommand = 6; NullDatanodeCommand = 7; BlockIdCommand = 8; + BlockECRecoveryCommand = 9; } required Type cmdType = 1; // Type of the command @@ -72,6 +73,7 @@ message DatanodeCommandProto { optional KeyUpdateCommandProto keyUpdateCmd = 6; optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; + optional BlockECRecoveryCommandProto blkECRecoveryCmd = 9; } /**