HDFS-14027. DFSStripedOutputStream should implement both hsync methods.
(cherry picked from commit db7e636824
)
This commit is contained in:
parent
288bc3f1eb
commit
8788489df4
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.StreamCapabilities;
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -956,11 +957,22 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
@Override
|
@Override
|
||||||
public void hflush() {
|
public void hflush() {
|
||||||
// not supported yet
|
// not supported yet
|
||||||
|
LOG.debug("DFSStripedOutputStream does not support hflush. "
|
||||||
|
+ "Caller should check StreamCapabilities before calling.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void hsync() {
|
public void hsync() {
|
||||||
// not supported yet
|
// not supported yet
|
||||||
|
LOG.debug("DFSStripedOutputStream does not support hsync. "
|
||||||
|
+ "Caller should check StreamCapabilities before calling.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void hsync(EnumSet<SyncFlag> syncFlags) {
|
||||||
|
// not supported yet
|
||||||
|
LOG.debug("DFSStripedOutputStream does not support hsync {}. "
|
||||||
|
+ "Caller should check StreamCapabilities before calling.", syncFlags);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,12 +18,14 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
|
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -196,19 +199,26 @@ public class TestDFSStripedOutputStream {
|
||||||
public void testStreamFlush() throws Exception {
|
public void testStreamFlush() throws Exception {
|
||||||
final byte[] bytes = StripedFileTestUtil.generateBytes(blockSize *
|
final byte[] bytes = StripedFileTestUtil.generateBytes(blockSize *
|
||||||
dataBlocks * 3 + cellSize * dataBlocks + cellSize + 123);
|
dataBlocks * 3 + cellSize * dataBlocks + cellSize + 123);
|
||||||
FSDataOutputStream os = fs.create(new Path("/ec-file-1"));
|
try (FSDataOutputStream os = fs.create(new Path("/ec-file-1"))) {
|
||||||
assertFalse("DFSStripedOutputStream should not have hflush() " +
|
assertFalse(
|
||||||
"capability yet!", os.hasCapability(
|
"DFSStripedOutputStream should not have hflush() capability yet!",
|
||||||
StreamCapability.HFLUSH.getValue()));
|
os.hasCapability(StreamCapability.HFLUSH.getValue()));
|
||||||
assertFalse("DFSStripedOutputStream should not have hsync() " +
|
assertFalse(
|
||||||
"capability yet!", os.hasCapability(
|
"DFSStripedOutputStream should not have hsync() capability yet!",
|
||||||
StreamCapability.HSYNC.getValue()));
|
os.hasCapability(StreamCapability.HSYNC.getValue()));
|
||||||
InputStream is = new ByteArrayInputStream(bytes);
|
try (InputStream is = new ByteArrayInputStream(bytes)) {
|
||||||
IOUtils.copyBytes(is, os, bytes.length);
|
IOUtils.copyBytes(is, os, bytes.length);
|
||||||
os.hflush();
|
os.hflush();
|
||||||
IOUtils.copyBytes(is, os, bytes.length);
|
IOUtils.copyBytes(is, os, bytes.length);
|
||||||
os.hsync();
|
os.hsync();
|
||||||
os.close();
|
IOUtils.copyBytes(is, os, bytes.length);
|
||||||
|
}
|
||||||
|
assertTrue("stream is not a DFSStripedOutputStream",
|
||||||
|
os.getWrappedStream() instanceof DFSStripedOutputStream);
|
||||||
|
final DFSStripedOutputStream dfssos =
|
||||||
|
(DFSStripedOutputStream) os.getWrappedStream();
|
||||||
|
dfssos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testOneFile(String src, int writeBytes) throws Exception {
|
private void testOneFile(String src, int writeBytes) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue