HADOOP-13327 Output Stream Specification. (#2587)

This defines what output streams and especially those which implement
Syncable are meant to do, and documents where implementations (HDFS; S3)
don't. With tests.

The file:// FileSystem now supports Syncable if an application calls
FileSystem.setWriteChecksum(false) before creating a file -checksumming
and Syncable.hsync() are incompatible.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2021-02-10 10:28:59 +00:00 committed by GitHub
parent a8bd516e39
commit 798df6d699
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1561 additions and 99 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@ -312,10 +313,7 @@ public class CryptoOutputStream extends FilterOutputStream implements
@Override
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
}
return false;
return StoreImplementationUtils.hasCapability(out, capability);
}
@Override

View File

@ -21,6 +21,7 @@ import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.StringJoiner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -153,4 +154,12 @@ public class BufferedFSInputStream extends BufferedInputStream
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
@Override
public String toString() {
return new StringJoiner(", ",
BufferedFSInputStream.class.getSimpleName() + "[", "]")
.add("in=" + in)
.toString();
}
}

View File

@ -36,6 +36,6 @@ public interface CanSetDropBehind {
* UnsupportedOperationException If this stream doesn't support
* setting the drop-behind.
*/
public void setDropBehind(Boolean dropCache)
void setDropBehind(Boolean dropCache)
throws IOException, UnsupportedOperationException;
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
/****************************************************************
* Abstract Checksumed FileSystem.
@ -479,12 +480,15 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
/**
* Probe the inner stream for a capability.
*
* Syncable operations are rejected before being passed down.
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
if (isProbeForSyncable(capability)) {
return false;
}
return datas.hasCapability(capability);
}
}

View File

@ -29,6 +29,7 @@ import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
@ -237,10 +238,7 @@ public class FSDataInputStream extends DataInputStream
@Override
public boolean hasCapability(String capability) {
if (in instanceof StreamCapabilities) {
return ((StreamCapabilities) in).hasCapability(capability);
}
return false;
return StoreImplementationUtils.hasCapability(in, capability);
}
/**

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
@ -126,10 +127,7 @@ public class FSDataOutputStream extends DataOutputStream
@Override
public boolean hasCapability(String capability) {
if (wrappedStream instanceof StreamCapabilities) {
return ((StreamCapabilities) wrappedStream).hasCapability(capability);
}
return false;
return StoreImplementationUtils.hasCapability(wrappedStream, capability);
}
@Override // Syncable

View File

@ -33,7 +33,8 @@ import java.util.zip.Checksum;
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
abstract public class FSOutputSummer extends OutputStream {
abstract public class FSOutputSummer extends OutputStream implements
StreamCapabilities {
// data checksum
private final DataChecksum sum;
// internal buffer for storing data before it is checksumed
@ -254,4 +255,9 @@ abstract public class FSOutputSummer extends OutputStream {
protected synchronized void resetChecksumBufSize() {
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
}
@Override
public boolean hasCapability(String capability) {
return false;
}
}

View File

@ -43,10 +43,12 @@ import java.util.EnumSet;
import java.util.Locale;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
@ -137,8 +139,13 @@ public class RawLocalFileSystem extends FileSystem {
STREAM_READ_SKIP_BYTES)
.build();
/** Reference to the bytes read counter for slightly faster counting. */
private final AtomicLong bytesRead;
public LocalFSFileInputStream(Path f) throws IOException {
fis = new FileInputStream(pathToFile(f));
bytesRead = ioStatistics.getCounterReference(
STREAM_READ_BYTES);
}
@Override
@ -161,8 +168,8 @@ public class RawLocalFileSystem extends FileSystem {
return false;
}
/*
* Just forward to the fis
/**
* Just forward to the fis.
*/
@Override
public int available() throws IOException { return fis.available(); }
@ -178,7 +185,7 @@ public class RawLocalFileSystem extends FileSystem {
if (value >= 0) {
this.position++;
statistics.incrementBytesRead(1);
ioStatistics.incrementCounter(STREAM_READ_BYTES);
bytesRead.addAndGet(1);
}
return value;
} catch (IOException e) { // unexpected exception
@ -196,7 +203,7 @@ public class RawLocalFileSystem extends FileSystem {
if (value > 0) {
this.position += value;
statistics.incrementBytesRead(value);
ioStatistics.incrementCounter(STREAM_READ_BYTES, value);
bytesRead.addAndGet(value);
}
return value;
} catch (IOException e) { // unexpected exception
@ -285,7 +292,7 @@ public class RawLocalFileSystem extends FileSystem {
* For create()'s FSOutputStream.
*********************************************************/
final class LocalFSFileOutputStream extends OutputStream implements
IOStatisticsSource, StreamCapabilities {
IOStatisticsSource, StreamCapabilities, Syncable {
private FileOutputStream fos;
/**
@ -354,6 +361,21 @@ public class RawLocalFileSystem extends FileSystem {
}
}
@Override
public void hflush() throws IOException {
flush();
}
/**
* HSync calls sync on fhe file descriptor after a local flush() call.
* @throws IOException failure
*/
@Override
public void hsync() throws IOException {
flush();
fos.getFD().sync();
}
@Override
public boolean hasCapability(String capability) {
// a bit inefficient, but intended to make it easier to add
@ -362,7 +384,7 @@ public class RawLocalFileSystem extends FileSystem {
case StreamCapabilities.IOSTATISTICS:
return true;
default:
return false;
return StoreImplementationUtils.isProbeForSyncable(capability);
}
}

View File

@ -34,7 +34,11 @@ import org.apache.hadoop.classification.InterfaceStability;
public interface StreamCapabilities {
/**
* Stream hflush capability implemented by {@link Syncable#hflush()}.
*
* Use the {@link #HSYNC} probe to check for the support of Syncable;
* it's that presence of {@code hsync()} which matters.
*/
@Deprecated
String HFLUSH = "hflush";
/**

View File

@ -23,20 +23,24 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** This interface for flush/sync operation. */
/**
* This is the interface for flush/sync operations.
* Consult the Hadoop filesystem specification for the definition of the
* semantics of these operations.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceStability.Stable
public interface Syncable {
/** Flush out the data in client's user buffer. After the return of
* this call, new readers will see the data.
* @throws IOException if any error occurs
*/
public void hflush() throws IOException;
void hflush() throws IOException;
/** Similar to posix fsync, flush out the data in client's user buffer
* all the way to the disk device (but the disk may have it in its cache).
* @throws IOException if error occurs
*/
public void hsync() throws IOException;
void hsync() throws IOException;
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.hadoop.fs.impl;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StreamCapabilities;
import static org.apache.hadoop.fs.StreamCapabilities.HFLUSH;
import static org.apache.hadoop.fs.StreamCapabilities.HSYNC;
/**
* Utility classes to help implementing filesystems and streams.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class StoreImplementationUtils {
private StoreImplementationUtils() {
}
/**
* Check the probe capability being for {@link StreamCapabilities#HSYNC}
* or {@link StreamCapabilities#HFLUSH}
* {@code Syncable.hsync()} and {@code Syncable.hflush()} functionality.
* @param capability capability string.
* @return true if either refers to one of the Syncable operations.
*/
public static boolean isProbeForSyncable(String capability) {
return capability.equalsIgnoreCase(HSYNC) ||
capability.equalsIgnoreCase(HFLUSH);
}
/**
* Probe for an object having a capability; returns true
* if the stream implements {@link StreamCapabilities} and its
* {@code hasCapabilities()} method returns true for the capability.
* This is a package private method intended to provided a common
* implementation for input and output streams.
* {@link StreamCapabilities#hasCapability(String)} call is for public use.
* @param object object to probe.
* @param capability capability to probe for
* @return true if the object implements stream capabilities and
* declares that it supports the capability.
*/
static boolean objectHasCapability(Object object, String capability) {
if (object instanceof StreamCapabilities) {
return ((StreamCapabilities) object).hasCapability(capability);
}
return false;
}
/**
* Probe for an output stream having a capability; returns true
* if the stream implements {@link StreamCapabilities} and its
* {@code hasCapabilities()} method returns true for the capability.
* @param out output stream
* @param capability capability to probe for
* @return true if the stream declares that it supports the capability.
*/
public static boolean hasCapability(OutputStream out, String capability) {
return objectHasCapability(out, capability);
}
/**
* Probe for an input stream having a capability; returns true
* if the stream implements {@link StreamCapabilities} and its
* {@code hasCapabilities()} method returns true for the capability.
* @param in input stream
* @param capability capability to probe for
* @return true if the stream declares that it supports the capability.
*/
public static boolean hasCapability(InputStream in, String capability) {
return objectHasCapability(in, capability);
}
}

View File

@ -664,11 +664,15 @@ For instance, HDFS may raise an `InvalidPathException`.
result = FSDataOutputStream
The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`.
A zero byte file MUST exist at the end of the specified path, visible to all.
The updated (valid) FileSystem MUST contain all the parent directories of the path, as created by `mkdirs(parent(p))`.
The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of
`FS.Files[p]`
The behavior of the returned stream is covered in [Output](outputstream.html).
#### Implementation Notes
* Some implementations split the create into a check for the file existing
@ -677,10 +681,18 @@ The result is `FSDataOutputStream`, which through its operations may generate ne
clients creating files with `overwrite==true` to fail if the file is created
by another client between the two tests.
* S3A, Swift and potentially other Object Stores do not currently change the FS state
* S3A, Swift and potentially other Object Stores do not currently change the `FS` state
until the output stream `close()` operation is completed.
This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`,
and potentially confuse file/directory logic
This is a significant difference between the behavior of object stores
and that of filesystems, as it allows >1 client to create a file with `overwrite=false`,
and potentially confuse file/directory logic. In particular, using `create()` to acquire
an exclusive lock on a file (whoever creates the file without an error is considered
the holder of the lock) may not not a safe algorithm to use when working with object stores.
* Object stores may create an empty file as a marker when a file is created.
However, object stores with `overwrite=true` semantics may not implement this atomically,
so creating files with `overwrite=false` cannot be used as an implicit exclusion
mechanism between processes.
* The Local FileSystem raises a `FileNotFoundException` when trying to create a file over
a directory, hence it is listed as an exception that MAY be raised when
@ -692,6 +704,8 @@ this precondition fails.
Make a `FSDataOutputStreamBuilder` to specify the parameters to create a file.
The behavior of the returned stream is covered in [Output](outputstream.html).
#### Implementation Notes
`createFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make
@ -717,17 +731,21 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep
#### Postconditions
FS
FS' = FS
result = FSDataOutputStream
Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]`
by appending data to the existing list.
The behavior of the returned stream is covered in [Output](outputstream.html).
### `FSDataOutputStreamBuilder appendFile(Path p)`
Make a `FSDataOutputStreamBuilder` to specify the parameters to append to an
existing file.
The behavior of the returned stream is covered in [Output](outputstream.html).
#### Implementation Notes
`appendFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make

View File

@ -32,6 +32,7 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [Notation](notation.html)
1. [Model](model.html)
1. [FileSystem class](filesystem.html)
1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
1. [FSDataInputStream class](fsdatainputstream.html)
1. [PathCapabilities interface](pathcapabilities.html)
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)

File diff suppressed because it is too large Load Diff

View File

@ -18,23 +18,31 @@
package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.junit.Test;
import org.junit.AssumptionViolatedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusEventually;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
/**
* Test creating files, overwrite options etc.
@ -42,6 +50,9 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
public abstract class AbstractContractCreateTest extends
AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractCreateTest.class);
/**
* How long to wait for a path to become visible.
*/
@ -436,4 +447,145 @@ public abstract class AbstractContractCreateTest extends
writeDataset(fs, path, data, data.length, 1024 * 1024,
true);
}
@Test
public void testSyncable() throws Throwable {
describe("test declared and actual Syncable behaviors");
FileSystem fs = getFileSystem();
boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
boolean supportsSync = isSupported(SUPPORTS_HSYNC);
boolean metadataUpdatedOnHSync = isSupported(METADATA_UPDATED_ON_HSYNC);
validateSyncableSemantics(fs,
supportsSync,
supportsFlush,
metadataUpdatedOnHSync);
}
/**
* Validate the semantics of syncable.
* @param fs filesystem
* @param supportsSync sync is present
* @param supportsFlush flush is present.
* @param metadataUpdatedOnHSync Is the metadata updated after an hsync?
* @throws IOException failure
*/
protected void validateSyncableSemantics(final FileSystem fs,
final boolean supportsSync,
final boolean supportsFlush,
final boolean metadataUpdatedOnHSync)
throws IOException {
Path path = methodPath();
LOG.info("Expecting files under {} to have supportsSync={}"
+ " and supportsFlush={}; metadataUpdatedOnHSync={}",
path, supportsSync, supportsFlush, metadataUpdatedOnHSync);
try (FSDataOutputStream out = fs.create(path, true)) {
LOG.info("Created output stream {}", out);
// probe stream for support for flush/sync, whose capabilities
// of supports/does not support must match what is expected
String[] hflushCapabilities = {
StreamCapabilities.HFLUSH
};
String[] hsyncCapabilities = {
StreamCapabilities.HSYNC
};
if (supportsFlush) {
assertCapabilities(out, hflushCapabilities, null);
} else {
assertCapabilities(out, null, hflushCapabilities);
}
if (supportsSync) {
assertCapabilities(out, hsyncCapabilities, null);
} else {
assertCapabilities(out, null, hsyncCapabilities);
}
// write one byte, then hflush it
out.write('a');
try {
out.hflush();
if (!supportsFlush) {
// FSDataOutputStream silently downgrades to flush() here.
// This is not good, but if changed some applications
// break writing to some stores.
LOG.warn("FS doesn't support Syncable.hflush(),"
+ " but doesn't reject it either.");
}
} catch (UnsupportedOperationException e) {
if (supportsFlush) {
throw new AssertionError("hflush not supported", e);
}
}
// write a second byte, then hsync it.
out.write('b');
try {
out.hsync();
} catch (UnsupportedOperationException e) {
if (supportsSync) {
throw new AssertionError("HSync not supported", e);
}
}
if (supportsSync) {
// if sync really worked, data MUST be visible here
// first the metadata which MUST be present
final FileStatus st = fs.getFileStatus(path);
if (metadataUpdatedOnHSync) {
// not all stores reliably update it, HDFS/webHDFS in particular
assertEquals("Metadata not updated during write " + st,
2, st.getLen());
}
// there's no way to verify durability, but we can
// at least verify a new file input stream reads
// the data
try (FSDataInputStream in = fs.open(path)) {
assertEquals('a', in.read());
assertEquals('b', in.read());
assertEquals(-1, in.read());
LOG.info("Successfully read synced data on a new reader {}", in);
}
} else {
// no sync. Let's do a flush and see what happens.
out.flush();
// Now look at the filesystem.
try (FSDataInputStream in = fs.open(path)) {
int c = in.read();
if (c == -1) {
// nothing was synced; sync and flush really aren't there.
LOG.info("sync and flush are declared unsupported"
+ " -flushed changes were not saved");
} else {
LOG.info("sync and flush are declared unsupported"
+ " - but the stream does offer some sync/flush semantics");
}
// close outside a finally as we do want to see any exception raised.
in.close();
} catch (FileNotFoundException e) {
// that's OK if it's an object store, but not if its a real
// FS
if (!isSupported(IS_BLOBSTORE)) {
throw e;
} else {
LOG.warn(
"Output file was not created; this is an object store with different"
+ " visibility semantics");
}
}
}
// close the output stream
out.close();
final String stats = ioStatisticsSourceToString(out);
if (!stats.isEmpty()) {
LOG.info("IOStatistics {}", stats);
}
}
}
}

View File

@ -241,4 +241,19 @@ public interface ContractOptions {
*/
String TEST_RANDOM_SEEK_COUNT = "test.random-seek-count";
/**
* Is hflush supported in API and StreamCapabilities?
*/
String SUPPORTS_HFLUSH = "supports-hflush";
/**
* Is hsync supported in API and StreamCapabilities?
*/
String SUPPORTS_HSYNC = "supports-hsync";
/**
* Is the metadata updated after an hsync?
* HDFS does not do this.
*/
String METADATA_UPDATED_ON_HSYNC = "metadata_updated_on_hsync";
}

View File

@ -1542,19 +1542,49 @@ public class ContractTestUtils extends Assert {
StreamCapabilities source = (StreamCapabilities) stream;
if (shouldHaveCapabilities != null) {
for (String shouldHaveCapability : shouldHaveCapabilities) {
assertTrue("Should have capability: " + shouldHaveCapability,
assertTrue("Should have capability: " + shouldHaveCapability
+ " in " + source,
source.hasCapability(shouldHaveCapability));
}
}
if (shouldNotHaveCapabilities != null) {
for (String shouldNotHaveCapability : shouldNotHaveCapabilities) {
assertFalse("Should not have capability: " + shouldNotHaveCapability,
assertFalse("Should not have capability: " + shouldNotHaveCapability
+ " in " + source,
source.hasCapability(shouldNotHaveCapability));
}
}
}
/**
* Custom assert to verify capabilities supported by
* an object through {@link StreamCapabilities}.
*
* @param source The object to test for StreamCapabilities
* @param capabilities The list of expected capabilities
*/
public static void assertHasStreamCapabilities(
final Object source,
final String... capabilities) {
assertCapabilities(source, capabilities, null);
}
/**
* Custom assert to verify capabilities NOT supported by
* an object through {@link StreamCapabilities}.
*
* @param source The object to test for StreamCapabilities
* @param capabilities The list of capabilities which must not be
* supported.
*/
public static void assertLacksStreamCapabilities(
final Object source,
final String... capabilities) {
assertCapabilities(source, null, capabilities);
}
/**
* Custom assert to test {@link PathCapabilities}.
*
@ -1569,7 +1599,8 @@ public class ContractTestUtils extends Assert {
for (String shouldHaveCapability: capabilities) {
assertTrue("Should have capability: " + shouldHaveCapability
+ " under " + path,
+ " under " + path
+ " in " + source,
source.hasPathCapability(path, shouldHaveCapability));
}
}

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.fs.contract.localfs;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
@ -29,4 +32,17 @@ public class TestLocalFSContractCreate extends AbstractContractCreateTest {
return new LocalFSContract(conf);
}
@Test
public void testSyncablePassthroughIfChecksumDisabled() throws Throwable {
describe("Create an instance of the local fs, disable the checksum"
+ " and verify that Syncable now works");
LocalFileSystem fs = (LocalFileSystem) getFileSystem();
try (LocalFileSystem lfs = new LocalFileSystem(
fs.getRawFileSystem())) {
// disable checksumming output
lfs.setWriteChecksum(false);
// now the filesystem supports Sync with immediate update of file status
validateSyncableSemantics(lfs, true, true, true);
}
}
}

View File

@ -121,4 +121,14 @@ case sensitivity and permission options are determined at run time from OS type
<value>true</value>
</property>
<property>
<name>fs.contract.supports-settimes</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-getfilestatus</name>
<value>true</value>
</property>
</configuration>

View File

@ -127,4 +127,19 @@
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hflush</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hsync</name>
<value>true</value>
</property>
<property>
<name>fs.contract.metadata_updated_on_hsync</name>
<value>true</value>
</property>
</configuration>

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@ -66,7 +67,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.tracing.TraceScope;
import org.slf4j.Logger;
@ -563,13 +563,7 @@ public class DFSOutputStream extends FSOutputSummer
@Override
public boolean hasCapability(String capability) {
switch (StringUtils.toLowerCase(capability)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return true;
default:
return false;
}
return StoreImplementationUtils.isProbeForSyncable(capability);
}
/**

View File

@ -116,4 +116,19 @@
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hflush</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hsync</name>
<value>true</value>
</property>
<property>
<name>fs.contract.metadata_updated_on_hsync</name>
<value>false</value>
</property>
</configuration>

View File

@ -37,6 +37,8 @@ import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@ -70,15 +72,20 @@ import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
* is instead done as a single PUT operation.
*
* Unstable: statistics and error handling might evolve.
*
* Syncable is declared as supported so the calls can be
* explicitly rejected.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3ABlockOutputStream extends OutputStream implements
StreamCapabilities, IOStatisticsSource {
StreamCapabilities, IOStatisticsSource, Syncable {
private static final Logger LOG =
LoggerFactory.getLogger(S3ABlockOutputStream.class);
private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable";
/** Owner FileSystem. */
private final S3AFileSystem fs;
@ -546,6 +553,16 @@ class S3ABlockOutputStream extends OutputStream implements
}
}
@Override
public void hflush() throws IOException {
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
}
@Override
public void hsync() throws IOException {
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
}
@Override
public IOStatistics getIOStatistics() {
return iostatistics;

View File

@ -116,5 +116,5 @@ public final class InternalConstants {
* problems related to region/endpoint setup, it is currently
* disabled.
*/
public static final boolean AWS_SDK_METRICS_ENABLED = false;
public static final boolean AWS_SDK_METRICS_ENABLED = true;
}

View File

@ -22,6 +22,8 @@ import com.microsoft.azure.datalake.store.ADLFileOutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import java.io.IOException;
@ -42,7 +44,8 @@ import static org.apache.hadoop.fs.adl.AdlConfKeys.WRITE_BUFFER_SIZE_KEY;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class AdlFsOutputStream extends OutputStream implements Syncable {
public final class AdlFsOutputStream extends OutputStream
implements Syncable, StreamCapabilities {
private final ADLFileOutputStream out;
public AdlFsOutputStream(ADLFileOutputStream out, Configuration configuration)
@ -79,4 +82,9 @@ public final class AdlFsOutputStream extends OutputStream implements Syncable {
public synchronized void hsync() throws IOException {
out.flush();
}
@Override
public boolean hasCapability(String capability) {
return StoreImplementationUtils.isProbeForSyncable(capability);
}
}

View File

@ -153,4 +153,14 @@
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hflush</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hsync</name>
<value>true</value>
</property>
</configuration>

View File

@ -27,7 +27,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
@ -551,13 +551,7 @@ public class BlockBlobAppendStream extends OutputStream implements Syncable,
if (!compactionEnabled) {
return false;
}
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return true;
default:
return false;
}
return StoreImplementationUtils.isProbeForSyncable(capability);
}
/**

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@ -1052,10 +1053,7 @@ public class NativeAzureFileSystem extends FileSystem {
*/
@Override // StreamCapability
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
}
return false;
return StoreImplementationUtils.hasCapability(out, capability);
}
@Override

View File

@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
/**
* Support the Syncable interface on top of a DataOutputStream.
@ -56,10 +57,7 @@ public class SyncableDataOutputStream extends DataOutputStream
@Override
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
}
return false;
return StoreImplementationUtils.hasCapability(out, capability);
}
@Override

View File

@ -24,7 +24,6 @@ import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
@ -54,6 +53,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.io.IOUtils.wrapException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
@ -164,13 +164,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
*/
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return supportFlush;
default:
return false;
}
return supportFlush && isProbeForSyncable(capability);
}
/**

View File

@ -27,8 +27,6 @@ import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -39,8 +37,9 @@ import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeNotNull;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities;
/**
* Test semantics of functions flush, hflush, hsync, and close for block blobs,
@ -192,11 +191,14 @@ public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
public void testPageBlobCapabilities() throws IOException {
Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
try (FSDataOutputStream stream = fs.create(path)) {
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
assertCapabilities(stream,
new String[]{
StreamCapabilities.HFLUSH,
StreamCapabilities.HSYNC,
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
StreamCapabilities.UNBUFFER},
null);
stream.write(getRandomBytes());
}
}
@ -285,11 +287,12 @@ public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
public void testBlockBlobCapabilities() throws IOException {
Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
try (FSDataOutputStream stream = fs.create(path)) {
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
assertLacksStreamCapabilities(stream,
StreamCapabilities.HFLUSH,
StreamCapabilities.HSYNC,
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
StreamCapabilities.UNBUFFER);
stream.write(getRandomBytes());
}
}
@ -381,11 +384,12 @@ public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
public void testBlockBlobCompactionCapabilities() throws IOException {
Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
try (FSDataOutputStream stream = fs.create(path)) {
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
assertHasStreamCapabilities(stream,
StreamCapabilities.HFLUSH,
StreamCapabilities.HSYNC,
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
StreamCapabilities.UNBUFFER);
stream.write(getRandomBytes());
}
}

View File

@ -41,6 +41,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities;
/**
* Test flush operation.
* This class cannot be run in parallel test mode--check comments in
@ -306,11 +309,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
assertLacksStreamCapabilities(stream,
StreamCapabilities.HFLUSH,
StreamCapabilities.HSYNC,
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
StreamCapabilities.UNBUFFER);
}
}
@ -320,11 +324,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
assertHasStreamCapabilities(stream,
StreamCapabilities.HFLUSH,
StreamCapabilities.HSYNC,
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
StreamCapabilities.UNBUFFER);
}
}

View File

@ -66,4 +66,20 @@
<name>fs.contract.supports-unbuffer</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hflush</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hsync</name>
<value>true</value>
</property>
<property>
<name>fs.contract.metadata_updated_on_hsync</name>
<value>true</value>
</property>
</configuration>