HDFS-10994. Support an XOR policy XOR-2-1-64k in HDFS. Contributed by Sammi Chen
This commit is contained in:
parent
209e805430
commit
5d5614f847
|
@ -38,4 +38,7 @@ public final class ErasureCodeConstants {
|
|||
|
||||
public static final ECSchema RS_6_3_LEGACY_SCHEMA = new ECSchema(
|
||||
RS_LEGACY_CODEC_NAME, 6, 3);
|
||||
|
||||
public static final ECSchema XOR_2_1_SCHEMA = new ECSchema(
|
||||
XOR_CODEC_NAME, 2, 1);
|
||||
}
|
||||
|
|
|
@ -147,6 +147,7 @@ public final class HdfsConstants {
|
|||
public static final byte RS_6_3_POLICY_ID = 0;
|
||||
public static final byte RS_3_2_POLICY_ID = 1;
|
||||
public static final byte RS_6_3_LEGACY_POLICY_ID = 2;
|
||||
public static final byte XOR_2_1_POLICY_ID = 3;
|
||||
|
||||
/* Hidden constructor */
|
||||
protected HdfsConstants() {
|
||||
|
|
|
@ -36,7 +36,7 @@ import java.util.TreeMap;
|
|||
public final class ErasureCodingPolicyManager {
|
||||
|
||||
/**
|
||||
* TODO: HDFS-8095
|
||||
* TODO: HDFS-8095.
|
||||
*/
|
||||
private static final int DEFAULT_CELLSIZE = 64 * 1024;
|
||||
private static final ErasureCodingPolicy SYS_POLICY1 =
|
||||
|
@ -48,10 +48,14 @@ public final class ErasureCodingPolicyManager {
|
|||
private static final ErasureCodingPolicy SYS_POLICY3 =
|
||||
new ErasureCodingPolicy(ErasureCodeConstants.RS_6_3_LEGACY_SCHEMA,
|
||||
DEFAULT_CELLSIZE, HdfsConstants.RS_6_3_LEGACY_POLICY_ID);
|
||||
private static final ErasureCodingPolicy SYS_POLICY4 =
|
||||
new ErasureCodingPolicy(ErasureCodeConstants.XOR_2_1_SCHEMA,
|
||||
DEFAULT_CELLSIZE, HdfsConstants.XOR_2_1_POLICY_ID);
|
||||
|
||||
//We may add more later.
|
||||
private static final ErasureCodingPolicy[] SYS_POLICIES =
|
||||
new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3};
|
||||
new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3,
|
||||
SYS_POLICY4};
|
||||
|
||||
// Supported storage policies for striped EC files
|
||||
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE = new byte[] {
|
||||
|
@ -96,6 +100,19 @@ public final class ErasureCodingPolicyManager {
|
|||
return SYS_POLICY1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get system-wide policy by policy ID.
|
||||
* @return ecPolicy
|
||||
*/
|
||||
public static ErasureCodingPolicy getPolicyByPolicyID(byte id) {
|
||||
for (ErasureCodingPolicy policy : SYS_POLICIES) {
|
||||
if (policy.getId() == id) {
|
||||
return policy;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all policies that's available to use.
|
||||
* @return all policies
|
||||
|
@ -141,7 +158,7 @@ public final class ErasureCodingPolicyManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Clear and clean up
|
||||
* Clear and clean up.
|
||||
*/
|
||||
public void clear() {
|
||||
activePoliciesByName.clear();
|
||||
|
|
|
@ -455,9 +455,13 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
if(!isStriped()){
|
||||
return max;
|
||||
}
|
||||
// TODO support more policies based on policyId
|
||||
|
||||
ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
ErasureCodingPolicyManager.getPolicyByPolicyID(
|
||||
getErasureCodingPolicyID());
|
||||
if (ecPolicy == null){
|
||||
ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
}
|
||||
return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
|
||||
}
|
||||
|
||||
|
|
|
@ -1888,21 +1888,41 @@ public class DFSTestUtil {
|
|||
* Creates the metadata of a file in striped layout. This method only
|
||||
* manipulates the NameNode state without injecting data to DataNode.
|
||||
* You should disable periodical heartbeat before use this.
|
||||
* @param file Path of the file to create
|
||||
* @param file Path of the file to create
|
||||
* @param dir Parent path of the file
|
||||
* @param numBlocks Number of striped block groups to add to the file
|
||||
* @param numStripesPerBlk Number of striped cells in each block
|
||||
* @param toMkdir
|
||||
*/
|
||||
public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir,
|
||||
int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception {
|
||||
public static void createStripedFile(MiniDFSCluster cluster, Path file,
|
||||
Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir)
|
||||
throws Exception {
|
||||
createStripedFile(cluster, file, dir, numBlocks, numStripesPerBlk,
|
||||
toMkdir, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the metadata of a file in striped layout. This method only
|
||||
* manipulates the NameNode state without injecting data to DataNode.
|
||||
* You should disable periodical heartbeat before use this.
|
||||
* @param file Path of the file to create
|
||||
* @param dir Parent path of the file
|
||||
* @param numBlocks Number of striped block groups to add to the file
|
||||
* @param numStripesPerBlk Number of striped cells in each block
|
||||
* @param toMkdir
|
||||
* @param ecPolicy erasure coding policy apply to created file. A null value
|
||||
* means using default erasure coding policy.
|
||||
*/
|
||||
public static void createStripedFile(MiniDFSCluster cluster, Path file,
|
||||
Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir,
|
||||
ErasureCodingPolicy ecPolicy) throws Exception {
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
// If outer test already set EC policy, dir should be left as null
|
||||
if (toMkdir) {
|
||||
assert dir != null;
|
||||
dfs.mkdirs(dir);
|
||||
try {
|
||||
dfs.getClient().setErasureCodingPolicy(dir.toString(), null);
|
||||
dfs.getClient().setErasureCodingPolicy(dir.toString(), ecPolicy);
|
||||
} catch (IOException e) {
|
||||
if (!e.getMessage().contains("non-empty directory")) {
|
||||
throw e;
|
||||
|
|
|
@ -64,20 +64,34 @@ public class TestDFSStripedInputStream {
|
|||
private DistributedFileSystem fs;
|
||||
private final Path dirPath = new Path("/striped");
|
||||
private Path filePath = new Path(dirPath, "file");
|
||||
private final ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
|
||||
private final short parityBlocks = (short) ecPolicy.getNumParityUnits();
|
||||
private final int cellSize = ecPolicy.getCellSize();
|
||||
private ErasureCodingPolicy ecPolicy;
|
||||
private short dataBlocks;
|
||||
private short parityBlocks;
|
||||
private int cellSize;
|
||||
private final int stripesPerBlock = 2;
|
||||
private final int blockSize = stripesPerBlock * cellSize;
|
||||
private final int blockGroupSize = dataBlocks * blockSize;
|
||||
private int blockSize;
|
||||
private int blockGroupSize;
|
||||
|
||||
@Rule
|
||||
public Timeout globalTimeout = new Timeout(300000);
|
||||
|
||||
public ErasureCodingPolicy getEcPolicy() {
|
||||
return ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
/*
|
||||
* Initialize erasure coding policy.
|
||||
*/
|
||||
ecPolicy = getEcPolicy();
|
||||
dataBlocks = (short) ecPolicy.getNumDataUnits();
|
||||
parityBlocks = (short) ecPolicy.getNumParityUnits();
|
||||
cellSize = ecPolicy.getCellSize();
|
||||
blockSize = stripesPerBlock * cellSize;
|
||||
blockGroupSize = dataBlocks * blockSize;
|
||||
System.out.println("EC policy = " + ecPolicy);
|
||||
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
|
||||
if (ErasureCodeNative.isNativeCodeLoaded()) {
|
||||
|
@ -94,7 +108,7 @@ public class TestDFSStripedInputStream {
|
|||
}
|
||||
fs = cluster.getFileSystem();
|
||||
fs.mkdirs(dirPath);
|
||||
fs.getClient().setErasureCodingPolicy(dirPath.toString(), null);
|
||||
fs.getClient().setErasureCodingPolicy(dirPath.toString(), ecPolicy);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -106,13 +120,13 @@ public class TestDFSStripedInputStream {
|
|||
}
|
||||
|
||||
/**
|
||||
* Test {@link DFSStripedInputStream#getBlockAt(long)}
|
||||
* Test {@link DFSStripedInputStream#getBlockAt(long)}.
|
||||
*/
|
||||
@Test
|
||||
public void testRefreshBlock() throws Exception {
|
||||
final int numBlocks = 4;
|
||||
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||
stripesPerBlock, false);
|
||||
stripesPerBlock, false, ecPolicy);
|
||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||
filePath.toString(), 0, blockGroupSize * numBlocks);
|
||||
final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||
|
@ -136,7 +150,7 @@ public class TestDFSStripedInputStream {
|
|||
public void testPread() throws Exception {
|
||||
final int numBlocks = 2;
|
||||
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||
stripesPerBlock, false);
|
||||
stripesPerBlock, false, ecPolicy);
|
||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||
filePath.toString(), 0, blockGroupSize * numBlocks);
|
||||
int fileLen = blockGroupSize * numBlocks;
|
||||
|
@ -154,7 +168,9 @@ public class TestDFSStripedInputStream {
|
|||
bg.getBlock().getBlockPoolId());
|
||||
}
|
||||
|
||||
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
|
||||
/**
|
||||
* A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks
|
||||
*/
|
||||
for (int i = 0; i < stripesPerBlock; i++) {
|
||||
for (int j = 0; j < dataBlocks; j++) {
|
||||
for (int k = 0; k < cellSize; k++) {
|
||||
|
@ -194,7 +210,7 @@ public class TestDFSStripedInputStream {
|
|||
final int numBlocks = 4;
|
||||
final int failedDNIdx = dataBlocks - 1;
|
||||
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||
stripesPerBlock, false);
|
||||
stripesPerBlock, false, ecPolicy);
|
||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||
filePath.toString(), 0, blockGroupSize);
|
||||
|
||||
|
@ -305,7 +321,7 @@ public class TestDFSStripedInputStream {
|
|||
setup();
|
||||
}
|
||||
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||
stripesPerBlock, false);
|
||||
stripesPerBlock, false, ecPolicy);
|
||||
LocatedBlocks lbs = fs.getClient().namenode.
|
||||
getBlockLocations(filePath.toString(), 0, fileSize);
|
||||
|
||||
|
@ -330,7 +346,9 @@ public class TestDFSStripedInputStream {
|
|||
byte[] expected = new byte[fileSize];
|
||||
|
||||
for (LocatedBlock bg : lbs.getLocatedBlocks()) {
|
||||
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
|
||||
/**
|
||||
* A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks
|
||||
*/
|
||||
for (int i = 0; i < stripesPerBlock; i++) {
|
||||
for (int j = 0; j < dataBlocks; j++) {
|
||||
for (int k = 0; k < cellSize; k++) {
|
||||
|
@ -371,7 +389,7 @@ public class TestDFSStripedInputStream {
|
|||
final int numBlocks = 4;
|
||||
final int failedDNIdx = dataBlocks - 1;
|
||||
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||
stripesPerBlock, false);
|
||||
stripesPerBlock, false, ecPolicy);
|
||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||
filePath.toString(), 0, blockGroupSize);
|
||||
|
||||
|
|
|
@ -47,23 +47,36 @@ public class TestDFSStripedOutputStream {
|
|||
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
private final ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
private final int dataBlocks = ecPolicy.getNumDataUnits();
|
||||
private final int parityBlocks = ecPolicy.getNumParityUnits();
|
||||
private ErasureCodingPolicy ecPolicy;
|
||||
private int dataBlocks;
|
||||
private int parityBlocks;
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
private Configuration conf;
|
||||
private final int cellSize = ecPolicy.getCellSize();
|
||||
private int cellSize;
|
||||
private final int stripesPerBlock = 4;
|
||||
private final int blockSize = cellSize * stripesPerBlock;
|
||||
private int blockSize;
|
||||
|
||||
@Rule
|
||||
public Timeout globalTimeout = new Timeout(300000);
|
||||
|
||||
public ErasureCodingPolicy getEcPolicy() {
|
||||
return ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
/*
|
||||
* Initialize erasure coding policy.
|
||||
*/
|
||||
ecPolicy = getEcPolicy();
|
||||
dataBlocks = (short) ecPolicy.getNumDataUnits();
|
||||
parityBlocks = (short) ecPolicy.getNumParityUnits();
|
||||
cellSize = ecPolicy.getCellSize();
|
||||
blockSize = stripesPerBlock * cellSize;
|
||||
System.out.println("EC policy = " + ecPolicy);
|
||||
|
||||
int numDNs = dataBlocks + parityBlocks + 2;
|
||||
conf = new Configuration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
|
@ -76,7 +89,7 @@ public class TestDFSStripedOutputStream {
|
|||
NativeRSRawErasureCoderFactory.class.getCanonicalName());
|
||||
}
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
|
||||
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", ecPolicy);
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
import org.apache.log4j.Level;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -76,18 +77,36 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|||
.getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
private final ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
private final int dataBlocks = ecPolicy.getNumDataUnits();
|
||||
private final int parityBlocks = ecPolicy.getNumParityUnits();
|
||||
private final int cellSize = ecPolicy.getCellSize();
|
||||
private ErasureCodingPolicy ecPolicy;
|
||||
private int dataBlocks;
|
||||
private int parityBlocks;
|
||||
private int cellSize;
|
||||
private final int stripesPerBlock = 4;
|
||||
private final int blockSize = cellSize * stripesPerBlock;
|
||||
private final int blockGroupSize = blockSize * dataBlocks;
|
||||
private int blockSize;
|
||||
private int blockGroupSize;
|
||||
|
||||
private static final int FLUSH_POS =
|
||||
9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
|
||||
|
||||
public ErasureCodingPolicy getEcPolicy() {
|
||||
return ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize erasure coding policy.
|
||||
*/
|
||||
@Before
|
||||
public void init(){
|
||||
ecPolicy = getEcPolicy();
|
||||
dataBlocks = ecPolicy.getNumDataUnits();
|
||||
parityBlocks = ecPolicy.getNumParityUnits();
|
||||
cellSize = ecPolicy.getCellSize();
|
||||
blockSize = cellSize * stripesPerBlock;
|
||||
blockGroupSize = blockSize * dataBlocks;
|
||||
dnIndexSuite = getDnIndexSuite();
|
||||
lengths = newLengths();
|
||||
}
|
||||
|
||||
List<Integer> newLengths() {
|
||||
final List<Integer> lens = new ArrayList<>();
|
||||
lens.add(FLUSH_POS + 2);
|
||||
|
@ -104,7 +123,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|||
return lens;
|
||||
}
|
||||
|
||||
private final int[][] dnIndexSuite = getDnIndexSuite();
|
||||
private int[][] dnIndexSuite;
|
||||
|
||||
private int[][] getDnIndexSuite() {
|
||||
final int maxNumLevel = 2;
|
||||
|
@ -167,7 +186,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|||
return positions;
|
||||
}
|
||||
|
||||
private final List<Integer> lengths = newLengths();
|
||||
private List<Integer> lengths;
|
||||
|
||||
Integer getLength(int i) {
|
||||
return i >= 0 && i < lengths.size()? lengths.get(i): null;
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||
|
||||
/**
|
||||
* This tests read operation of DFS striped file with XOR-2-1-64k erasure code
|
||||
* policy.
|
||||
*/
|
||||
public class TestDFSXORStripedInputStream extends TestDFSStripedInputStream{
|
||||
|
||||
public ErasureCodingPolicy getEcPolicy() {
|
||||
return ErasureCodingPolicyManager.getPolicyByPolicyID(
|
||||
HdfsConstants.XOR_2_1_POLICY_ID);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||
|
||||
/**
|
||||
* This tests write operation of DFS striped file with XOR-2-1-64k erasure code
|
||||
* policy.
|
||||
*/
|
||||
public class TestDFSXORStripedOutputStream extends TestDFSStripedOutputStream{
|
||||
|
||||
@Override
|
||||
public ErasureCodingPolicy getEcPolicy() {
|
||||
return ErasureCodingPolicyManager.getPolicyByPolicyID(
|
||||
HdfsConstants.XOR_2_1_POLICY_ID);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||
|
||||
/**
|
||||
* This tests write operation of DFS striped file with XOR-2-1-64k erasure code
|
||||
* policy when there is data node failure.
|
||||
*/
|
||||
public class TestDFSXORStripedOutputStreamWithFailure
|
||||
extends TestDFSStripedOutputStreamWithFailure{
|
||||
|
||||
@Override
|
||||
public ErasureCodingPolicy getEcPolicy() {
|
||||
return ErasureCodingPolicyManager.getPolicyByPolicyID(
|
||||
HdfsConstants.XOR_2_1_POLICY_ID);
|
||||
}
|
||||
}
|
|
@ -1,430 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TestProportionalCapacityPreemptionPolicyForReservedContainers
|
||||
extends ProportionalCapacityPreemptionPolicyMockFramework {
|
||||
@Before
|
||||
public void setup() {
|
||||
super.setup();
|
||||
conf.setBoolean(
|
||||
CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
|
||||
true);
|
||||
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptionForSimpleReservedContainer() throws IOException {
|
||||
/**
|
||||
* The simplest test of reserved container, Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* </pre>
|
||||
* Guaranteed resource of a/b are 50:50
|
||||
* Total cluster resource = 100
|
||||
* - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
|
||||
* container is 1.
|
||||
* - B has am container at n1, and reserves 1 container with size = 9 at n1,
|
||||
* so B needs to preempt 9 containers from A at n1 instead of randomly
|
||||
* preempt from n1 and n2.
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig = // n1 / n2 has no label
|
||||
"n1= res=50;" +
|
||||
"n2= res=50";
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending,reserved
|
||||
"root(=[100 100 100 9 9]);" + //root
|
||||
"-a(=[50 100 90 0]);" + // a
|
||||
"-b(=[50 100 10 9 9])"; // b
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a\t" // app1 in a
|
||||
+ "(1,1,n1,,45,false)" // 45 in n1
|
||||
+ "(1,1,n2,,45,false);" + // 45 in n2
|
||||
"b\t" // app2 in b
|
||||
+ "(1,1,n1,,1,false)" // AM container in n1
|
||||
+ "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
// Total 5 preempted from app1 at n1, don't preempt container from other
|
||||
// app/node
|
||||
verify(mDisp, times(5)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(1))));
|
||||
verify(mDisp, times(5)).handle(
|
||||
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n1", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(2))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUseReservedAndFifoSelectorTogether() throws IOException {
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* </pre>
|
||||
* Guaranteed resource of a/b are 30:70
|
||||
* Total cluster resource = 100
|
||||
* - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
|
||||
* container is 1.
|
||||
* - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
|
||||
* B also has 20 pending resources.
|
||||
* so B needs to preempt:
|
||||
* - 10 containers from n1 (for reserved)
|
||||
* - 5 containers from n2 for pending resources
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig = // n1 / n2 has no label
|
||||
"n1= res=50;" +
|
||||
"n2= res=50";
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending,reserved
|
||||
"root(=[100 100 100 70 10]);" + //root
|
||||
"-a(=[30 100 45 0]);" + // a
|
||||
"-b(=[70 100 55 70 50])"; // b
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a\t" // app1 in a
|
||||
+ "(1,1,n2,,35,false)" // 35 in n2
|
||||
+ "(1,1,n1,,10,false);" + // 10 in n1
|
||||
"b\t" // app2 in b
|
||||
+ "(1,1,n2,,5,false)" // 5 in n2
|
||||
+ "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
verify(mDisp, times(15)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(1))));
|
||||
verify(mDisp, times(10)).handle(
|
||||
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n1", 1))));
|
||||
verify(mDisp, times(5)).handle(
|
||||
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n2", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(2))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReservedSelectorSkipsAMContainer() throws IOException {
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* </pre>
|
||||
* Guaranteed resource of a/b are 30:70
|
||||
* Total cluster resource = 100
|
||||
* - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
|
||||
* container is 1.
|
||||
* - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
|
||||
* B also has 20 pending resources.
|
||||
*
|
||||
* Ideally B needs to preempt:
|
||||
* - 10 containers from n1 (for reserved)
|
||||
* - 5 containers from n2 for pending resources
|
||||
*
|
||||
* However, since one AM container is located at n1 (from queueA), we cannot
|
||||
* preempt 10 containers from n1 for reserved container. Instead, we will
|
||||
* preempt 15 containers from n2, since containers from queueA launched in n2
|
||||
* are later than containers from queueA launched in n1 (FIFO order of containers)
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig = // n1 / n2 has no label
|
||||
"n1= res=50;" +
|
||||
"n2= res=50";
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending,reserved
|
||||
"root(=[100 100 100 70 10]);" + //root
|
||||
"-a(=[30 100 45 0]);" + // a
|
||||
"-b(=[70 100 55 70 50])"; // b
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a\t" // app1 in a
|
||||
+ "(1,1,n1,,10,false)" // 10 in n1
|
||||
+ "(1,1,n2,,35,false);" +// 35 in n2
|
||||
"b\t" // app2 in b
|
||||
+ "(1,1,n2,,5,false)" // 5 in n2
|
||||
+ "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
verify(mDisp, times(15)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(1))));
|
||||
verify(mDisp, times(0)).handle(
|
||||
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n1", 1))));
|
||||
verify(mDisp, times(15)).handle(
|
||||
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n2", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(2))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptionForReservedContainerRespectGuaranteedResource()
|
||||
throws IOException {
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* </pre>
|
||||
* Guaranteed resource of a/b are 85:15
|
||||
* Total cluster resource = 100
|
||||
* - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
|
||||
* container is 1.
|
||||
* - B has am container at n1, and reserves 1 container with size = 9 at n1,
|
||||
*
|
||||
* If we preempt 9 containers from queue-A, queue-A will be below its
|
||||
* guaranteed resource = 90 - 9 = 81 < 85.
|
||||
*
|
||||
* So no preemption will take place
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig = // n1 / n2 has no label
|
||||
"n1= res=50;" +
|
||||
"n2= res=50";
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending,reserved
|
||||
"root(=[100 100 100 9 9]);" + //root
|
||||
"-a(=[85 100 90 0]);" + // a
|
||||
"-b(=[15 100 10 9 9])"; // b
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a\t" // app1 in a
|
||||
+ "(1,1,n1,,45,false)" // 45 in n1
|
||||
+ "(1,1,n2,,45,false);" + // 45 in n2
|
||||
"b\t" // app2 in b
|
||||
+ "(1,1,n1,,1,false)" // AM container in n1
|
||||
+ "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(2))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptionForReservedContainerWhichHasAvailableResource()
|
||||
throws IOException {
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* </pre>
|
||||
*
|
||||
* Guaranteed resource of a/b are 50:50
|
||||
* Total cluster resource = 100
|
||||
* - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
|
||||
* container is 1.
|
||||
* - B has am container at n1, and reserves 1 container with size = 9 at n1,
|
||||
*
|
||||
* So we can get 4 containers preempted after preemption.
|
||||
* (reserved 5 + preempted 4) = 9
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig = // n1 / n2 has no label
|
||||
"n1= res=50;" +
|
||||
"n2= res=50";
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending,reserved
|
||||
"root(=[100 100 99 9 9]);" + //root
|
||||
"-a(=[50 100 90 0]);" + // a
|
||||
"-b(=[50 100 9 9 9])"; // b
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a\t" // app1 in a
|
||||
+ "(1,1,n1,,45,false)" // 45 in n1
|
||||
+ "(1,1,n2,,45,false);" + // 45 in n2
|
||||
"b\t" // app2 in b
|
||||
+ "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
// Total 4 preempted from app1 at n1, don't preempt container from other
|
||||
// app/node
|
||||
verify(mDisp, times(4)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n1", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n2", 1))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource()
|
||||
throws IOException {
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* </pre>
|
||||
*
|
||||
* Guaranteed resource of a/b are 50:50
|
||||
* Total cluster resource = 100
|
||||
* - A has 45 containers on two node, size of each container is 2,
|
||||
* n1 has 23, n2 has 22
|
||||
* - B reserves 1 container with size = 9 at n1,
|
||||
*
|
||||
* So we can get 4 containers (total-resource = 8) preempted after
|
||||
* preemption. Actual required is 3.5, but we need to preempt integer
|
||||
* number of containers
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig = // n1 / n2 has no label
|
||||
"n1= res=50;" +
|
||||
"n2= res=50";
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending,reserved
|
||||
"root(=[100 100 99 9 9]);" + //root
|
||||
"-a(=[50 100 90 0]);" + // a
|
||||
"-b(=[50 100 9 9 9])"; // b
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a\t" // app1 in a
|
||||
+ "(1,2,n1,,24,false)" // 48 in n1
|
||||
+ "(1,2,n2,,23,false);" + // 46 in n2
|
||||
"b\t" // app2 in b
|
||||
+ "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
// Total 4 preempted from app1 at n1, don't preempt container from other
|
||||
// app/node
|
||||
verify(mDisp, times(4)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n1", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n2", 1))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptionForReservedContainerRespectAvailableResources()
|
||||
throws IOException {
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* </pre>
|
||||
*
|
||||
* Guaranteed resource of a/b are 50:50
|
||||
* Total cluster resource = 100, 4 nodes, 25 on each node
|
||||
* - A has 10 containers on every node, size of container is 2
|
||||
* - B reserves 1 container with size = 9 at n1,
|
||||
*
|
||||
* So even if we cannot allocate container for B now, no preemption should
|
||||
* happen since there're plenty of available resources.
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig =
|
||||
"n1= res=25;" +
|
||||
"n2= res=25;" +
|
||||
"n3= res=25;" +
|
||||
"n4= res=25;";
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending,reserved
|
||||
"root(=[100 100 89 9 9]);" + //root
|
||||
"-a(=[50 100 80 0]);" + // a
|
||||
"-b(=[50 100 9 9 9])"; // b
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a\t" // app1 in a
|
||||
+ "(1,2,n1,,10,false)" // 10 in n1
|
||||
+ "(1,2,n2,,10,false)" // 10 in n2
|
||||
+ "(1,2,n3,,10,false)" // 10 in n3
|
||||
+ "(1,2,n4,,10,false);" + // 10 in n4
|
||||
"b\t" // app2 in b
|
||||
+ "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
// No preemption should happen
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n1", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n2", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n3", 1))));
|
||||
verify(mDisp, times(0)).handle(argThat(
|
||||
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
|
||||
NodeId.newInstance("n4", 1))));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue