HDFS-14027. DFSStripedOutputStream should implement both hsync methods.

(cherry picked from commit db7e636824)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java

(cherry picked from commit 399645ebc1)
This commit is contained in:
Xiao Chen 2018-10-29 19:05:52 -07:00
parent 0f34ff7722
commit 20c9a12bc7
2 changed files with 35 additions and 13 deletions

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -956,11 +957,22 @@ public class DFSStripedOutputStream extends DFSOutputStream
@Override @Override
public void hflush() { public void hflush() {
// not supported yet // not supported yet
LOG.debug("DFSStripedOutputStream does not support hflush. "
+ "Caller should check StreamCapabilities before calling.");
} }
@Override @Override
public void hsync() { public void hsync() {
// not supported yet // not supported yet
LOG.debug("DFSStripedOutputStream does not support hsync. "
+ "Caller should check StreamCapabilities before calling.");
}
@Override
public void hsync(EnumSet<SyncFlag> syncFlags) {
// not supported yet
LOG.debug("DFSStripedOutputStream does not support hsync {}. "
+ "Caller should check StreamCapabilities before calling.", syncFlags);
} }
@Override @Override

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability; import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -195,19 +198,26 @@ public class TestDFSStripedOutputStream {
public void testStreamFlush() throws Exception { public void testStreamFlush() throws Exception {
final byte[] bytes = StripedFileTestUtil.generateBytes(blockSize * final byte[] bytes = StripedFileTestUtil.generateBytes(blockSize *
dataBlocks * 3 + cellSize * dataBlocks + cellSize + 123); dataBlocks * 3 + cellSize * dataBlocks + cellSize + 123);
FSDataOutputStream os = fs.create(new Path("/ec-file-1")); try (FSDataOutputStream os = fs.create(new Path("/ec-file-1"))) {
assertFalse("DFSStripedOutputStream should not have hflush() " + assertFalse(
"capability yet!", os.hasCapability( "DFSStripedOutputStream should not have hflush() capability yet!",
StreamCapability.HFLUSH.getValue())); os.hasCapability(StreamCapability.HFLUSH.getValue()));
assertFalse("DFSStripedOutputStream should not have hsync() " + assertFalse(
"capability yet!", os.hasCapability( "DFSStripedOutputStream should not have hsync() capability yet!",
StreamCapability.HSYNC.getValue())); os.hasCapability(StreamCapability.HSYNC.getValue()));
InputStream is = new ByteArrayInputStream(bytes); try (InputStream is = new ByteArrayInputStream(bytes)) {
IOUtils.copyBytes(is, os, bytes.length); IOUtils.copyBytes(is, os, bytes.length);
os.hflush(); os.hflush();
IOUtils.copyBytes(is, os, bytes.length); IOUtils.copyBytes(is, os, bytes.length);
os.hsync(); os.hsync();
os.close(); IOUtils.copyBytes(is, os, bytes.length);
}
assertTrue("stream is not a DFSStripedOutputStream",
os.getWrappedStream() instanceof DFSStripedOutputStream);
final DFSStripedOutputStream dfssos =
(DFSStripedOutputStream) os.getWrappedStream();
dfssos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
} }
private void testOneFile(String src, int writeBytes) throws Exception { private void testOneFile(String src, int writeBytes) throws Exception {