HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)

Contributed by Mikhail Pryakhin
This commit is contained in:
Mike 2020-06-03 13:37:40 +03:00 committed by Steve Loughran
parent c88bf7acc1
commit cf84bec6e3
7 changed files with 314 additions and 46 deletions

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.fs.sftp;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URLDecoder;
@ -516,21 +515,22 @@ public class SFTPFileSystem extends FileSystem {
disconnect(channel);
throw new IOException(String.format(E_PATH_DIR, f));
}
InputStream is;
try {
// the path could be a symbolic link, so get the real path
absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
is = channel.get(absolute.toUri().getPath());
} catch (SftpException e) {
throw new IOException(e);
}
return new FSDataInputStream(new SFTPInputStream(is, statistics)){
return new FSDataInputStream(
new SFTPInputStream(channel, absolute, statistics)){
@Override
public void close() throws IOException {
try {
super.close();
} finally {
disconnect(channel);
}
}
};
}

View File

@ -15,62 +15,107 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.sftp;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/** SFTP FileSystem input stream. */
class SFTPInputStream extends FSInputStream {
public static final String E_SEEK_NOTSUPPORTED = "Seek not supported";
public static final String E_NULL_INPUTSTREAM = "Null InputStream";
public static final String E_STREAM_CLOSED = "Stream closed";
private final ChannelSftp channel;
private final Path path;
private InputStream wrappedStream;
private FileSystem.Statistics stats;
private boolean closed;
private long pos;
private long nextPos;
private long contentLength;
SFTPInputStream(InputStream stream, FileSystem.Statistics stats) {
if (stream == null) {
throw new IllegalArgumentException(E_NULL_INPUTSTREAM);
}
this.wrappedStream = stream;
SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats)
throws IOException {
try {
this.channel = channel;
this.path = path;
this.stats = stats;
this.pos = 0;
this.closed = false;
this.wrappedStream = channel.get(path.toUri().getPath());
SftpATTRS stat = channel.lstat(path.toString());
this.contentLength = stat.getSize();
} catch (SftpException e) {
throw new IOException(e);
}
}
@Override
public void seek(long position) throws IOException {
throw new IOException(E_SEEK_NOTSUPPORTED);
public synchronized void seek(long position) throws IOException {
checkNotClosed();
if (position < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
nextPos = position;
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();
long remaining = contentLength - nextPos;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int) remaining;
}
private void seekInternal() throws IOException {
if (pos == nextPos) {
return;
}
if (nextPos > pos) {
long skipped = wrappedStream.skip(nextPos - pos);
pos = pos + skipped;
}
if (nextPos < pos) {
wrappedStream.close();
try {
wrappedStream = channel.get(path.toUri().getPath());
pos = wrappedStream.skip(nextPos);
} catch (SftpException e) {
throw new IOException(e);
}
}
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
throw new IOException(E_SEEK_NOTSUPPORTED);
return false;
}
@Override
public long getPos() throws IOException {
return pos;
public synchronized long getPos() throws IOException {
return nextPos;
}
@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException(E_STREAM_CLOSED);
checkNotClosed();
if (this.contentLength == 0 || (nextPos >= contentLength)) {
return -1;
}
seekInternal();
int byteRead = wrappedStream.read();
if (byteRead >= 0) {
pos++;
nextPos++;
}
if (stats != null & byteRead >= 0) {
stats.incrementBytesRead(1);
@ -78,23 +123,6 @@ class SFTPInputStream extends FSInputStream {
return byteRead;
}
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
if (closed) {
throw new IOException(E_STREAM_CLOSED);
}
int result = wrappedStream.read(buf, off, len);
if (result > 0) {
pos += result;
}
if (stats != null & result > 0) {
stats.incrementBytesRead(result);
}
return result;
}
public synchronized void close() throws IOException {
if (closed) {
return;
@ -103,4 +131,12 @@ class SFTPInputStream extends FSInputStream {
wrappedStream.close();
closed = true;
}
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED
);
}
}
}

View File

@ -69,6 +69,14 @@ public abstract class AbstractFSContract extends Configured {
}
/**
* Any teardown logic can go here.
* @throws IOException IO problems
*/
public void teardown() throws IOException {
}
/**
* Add a configuration resource to this instance's configuration
* @param resource resource reference
@ -113,7 +121,7 @@ public abstract class AbstractFSContract extends Configured {
public abstract FileSystem getTestFileSystem() throws IOException;
/**
* Get the scheme of this FS
* Get the scheme of this FS.
* @return the scheme this FS supports
*/
public abstract String getScheme();

View File

@ -213,6 +213,9 @@ public abstract class AbstractFSContractTestBase extends Assert
Thread.currentThread().setName("teardown");
LOG.debug("== Teardown ==");
deleteTestDirInTeardown();
if (contract != null) {
contract.teardown();
}
LOG.debug("== Teardown complete ==");
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.sftp;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.sftp.SFTPFileSystem;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.auth.UserAuth;
import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
public class SFTPContract extends AbstractFSContract {
private static final String CONTRACT_XML = "contract/sftp.xml";
private static final URI TEST_URI =
URI.create("sftp://user:password@localhost");
private final String testDataDir =
new FileSystemTestHelper().getTestRootDir();
private final Configuration conf;
private SshServer sshd;
public SFTPContract(Configuration conf) {
super(conf);
addConfResource(CONTRACT_XML);
this.conf = conf;
}
@Override
public void init() throws IOException {
sshd = SshServer.setUpDefaultServer();
// ask OS to assign a port
sshd.setPort(0);
sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<>();
userAuthFactories.add(new UserAuthPasswordFactory());
sshd.setUserAuthFactories(userAuthFactories);
sshd.setPasswordAuthenticator((username, password, session) ->
username.equals("user") && password.equals("password")
);
sshd.setSubsystemFactories(
Collections.singletonList(new SftpSubsystemFactory()));
sshd.start();
int port = sshd.getPort();
conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
conf.setInt("fs.sftp.host.port", port);
conf.setBoolean("fs.sftp.impl.disable.cache", true);
}
@Override
public void teardown() throws IOException {
if (sshd != null) {
sshd.stop();
}
}
@Override
public FileSystem getTestFileSystem() throws IOException {
return FileSystem.get(TEST_URI, conf);
}
@Override
public String getScheme() {
return "sftp";
}
@Override
public Path getTestPath() {
try {
FileSystem fs = FileSystem.get(
URI.create("sftp://user:password@localhost"), conf
);
return fs.makeQualified(new Path(testDataDir));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.sftp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestSFTPContractSeek extends AbstractContractSeekTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new SFTPContract(conf);
}
}

View File

@ -0,0 +1,79 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<!--
SFTP -these options are for testing against a remote unix filesystem.
-->
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>false</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>false</value>
</property>
</configuration>