HDFS-7663. Erasure Coding: Append on striped file. Contributed by Ayush Saxena.

This commit is contained in:
Vinayakumar B 2019-03-05 19:26:42 +05:30
parent 0aefe2846f
commit f940ab242d
4 changed files with 145 additions and 15 deletions

View File

@ -119,7 +119,7 @@ public class DFSOutputStream extends FSOutputSummer
protected int packetSize = 0; // write packet size, not including the header. protected int packetSize = 0; // write packet size, not including the header.
protected int chunksPerPacket = 0; protected int chunksPerPacket = 0;
protected long lastFlushOffset = 0; // offset when flush was invoked protected long lastFlushOffset = 0; // offset when flush was invoked
private long initialFileSize = 0; // at time of file open protected long initialFileSize = 0; // at time of file open
private final short blockReplication; // replication factor of file private final short blockReplication; // replication factor of file
protected boolean shouldSyncBlock = false; // force blocks to disk upon close protected boolean shouldSyncBlock = false; // force blocks to disk upon close
private final EnumSet<AddBlockFlag> addBlockFlags; private final EnumSet<AddBlockFlag> addBlockFlags;
@ -391,14 +391,16 @@ public class DFSOutputStream extends FSOutputSummer
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
throws IOException { throws IOException {
if(stat.getErasureCodingPolicy() != null) {
throw new IOException(
"Not support appending to a striping layout file yet.");
}
try (TraceScope ignored = try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForAppend", src)) { dfsClient.newPathTraceScope("newStreamForAppend", src)) {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, DFSOutputStream out;
progress, lastBlock, stat, checksum, favoredNodes); if (stat.isErasureCoded()) {
out = new DFSStripedOutputStream(dfsClient, src, flags, progress,
lastBlock, stat, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock,
stat, checksum, favoredNodes);
}
out.start(); out.start();
return out; return out;
} }

View File

@ -276,6 +276,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
private final int numAllBlocks; private final int numAllBlocks;
private final int numDataBlocks; private final int numDataBlocks;
private ExtendedBlock currentBlockGroup; private ExtendedBlock currentBlockGroup;
private ExtendedBlock prevBlockGroup4Append;
private final String[] favoredNodes; private final String[] favoredNodes;
private final List<StripedDataStreamer> failedStreamers; private final List<StripedDataStreamer> failedStreamers;
private final Map<Integer, Integer> corruptBlockCountMap; private final Map<Integer, Integer> corruptBlockCountMap;
@ -324,6 +325,16 @@ public class DFSStripedOutputStream extends DFSOutputStream
setCurrentStreamer(0); setCurrentStreamer(0);
} }
/** Construct a new output stream for appending to a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src,
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
throws IOException {
this(dfsClient, src, stat, flags, progress, checksum, favoredNodes);
initialFileSize = stat.getLen(); // length of file when opened
prevBlockGroup4Append = lastBlock != null ? lastBlock.getBlock() : null;
}
private boolean useDirectBuffer() { private boolean useDirectBuffer() {
return encoder.preferDirectBuffer(); return encoder.preferDirectBuffer();
} }
@ -473,12 +484,17 @@ public class DFSStripedOutputStream extends DFSOutputStream
+ Arrays.asList(excludedNodes)); + Arrays.asList(excludedNodes));
// replace failed streamers // replace failed streamers
ExtendedBlock prevBlockGroup = currentBlockGroup;
if (prevBlockGroup4Append != null) {
prevBlockGroup = prevBlockGroup4Append;
prevBlockGroup4Append = null;
}
replaceFailedStreamers(); replaceFailedStreamers();
LOG.debug("Allocating new block group. The previous block group: " LOG.debug("Allocating new block group. The previous block group: "
+ currentBlockGroup); + prevBlockGroup);
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src, final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags()); prevBlockGroup, fileId, favoredNodes, getAddBlockFlags());
assert lb.isStriped(); assert lb.isStriped();
// assign the new block to the current block group // assign the new block to the current block group
currentBlockGroup = lb.getBlock(); currentBlockGroup = lb.getBlock();

View File

@ -107,12 +107,6 @@ final class FSDirAppendOp {
} }
final INodeFile file = INodeFile.valueOf(inode, path, true); final INodeFile file = INodeFile.valueOf(inode, path, true);
// not support appending file with striped blocks
if (file.isStriped()) {
throw new UnsupportedOperationException(
"Cannot append to files with striped block " + path);
}
BlockManager blockManager = fsd.getBlockManager(); BlockManager blockManager = fsd.getBlockManager();
final BlockStoragePolicy lpPolicy = blockManager final BlockStoragePolicy lpPolicy = blockManager
.getStoragePolicy("LAZY_PERSIST"); .getStoragePolicy("LAZY_PERSIST");
@ -192,6 +186,10 @@ final class FSDirAppendOp {
LocatedBlock ret = null; LocatedBlock ret = null;
if (!newBlock) { if (!newBlock) {
if (file.isStriped()) {
throw new UnsupportedOperationException(
"Append on EC file without new block is not supported.");
}
FSDirectory fsd = fsn.getFSDirectory(); FSDirectory fsd = fsn.getFSDirectory();
ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0); ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
if (ret != null && delta != null) { if (ret != null && delta != null) {

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random;
import static org.junit.Assert.assertEquals;
/**
* Tests append on erasure coded file.
*/
public class TestStripedFileAppend {
public static final Log LOG = LogFactory.getLog(TestStripedFileAppend.class);
static {
DFSTestUtil.setNameNodeLogLevel(Level.ALL);
}
private static final int NUM_DATA_BLOCKS =
StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits();
private static final int CELL_SIZE =
StripedFileTestUtil.getDefaultECPolicy().getCellSize();
private static final int NUM_DN = 9;
private static final int STRIPES_PER_BLOCK = 4;
private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
private static final Random RANDOM = new Random();
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
private Path dir = new Path("/TestFileAppendStriped");
private HdfsConfiguration conf = new HdfsConfiguration();
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
dfs.mkdirs(dir);
dfs.setErasureCodingPolicy(dir, null);
}
@After
public void tearDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* test simple append to a closed striped file, with NEW_BLOCK flag enabled.
*/
@Test
public void testAppendToNewBlock() throws IOException {
int fileLength = 0;
int totalSplit = 6;
byte[] expected =
StripedFileTestUtil.generateBytes(BLOCK_GROUP_SIZE * totalSplit);
Path file = new Path(dir, "testAppendToNewBlock");
FSDataOutputStream out;
for (int split = 0; split < totalSplit; split++) {
if (split == 0) {
out = dfs.create(file);
} else {
out = dfs.append(file,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
}
int splitLength = RANDOM.nextInt(BLOCK_GROUP_SIZE);
out.write(expected, fileLength, splitLength);
fileLength += splitLength;
out.close();
}
expected = Arrays.copyOf(expected, fileLength);
LocatedBlocks lbs =
dfs.getClient().getLocatedBlocks(file.toString(), 0L, Long.MAX_VALUE);
assertEquals(totalSplit, lbs.getLocatedBlocks().size());
StripedFileTestUtil.verifyStatefulRead(dfs, file, fileLength, expected,
new byte[4096]);
StripedFileTestUtil.verifySeek(dfs, file, fileLength,
StripedFileTestUtil.getDefaultECPolicy(), totalSplit);
}
}