HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952)
Contributed by Mikhail Pryakhin.
This commit is contained in:
parent
c0b7b38e22
commit
68d8802624
|
@ -276,6 +276,11 @@
|
|||
<artifactId>sshd-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.ftpserver</groupId>
|
||||
<artifactId>ftpserver-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.htrace</groupId>
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.ftp;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ConnectException;
|
||||
import java.net.URI;
|
||||
|
||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -110,7 +112,9 @@ public class FTPFileSystem extends FileSystem {
|
|||
|
||||
// get port information from uri, (overrides info in conf)
|
||||
int port = uri.getPort();
|
||||
port = (port == -1) ? FTP.DEFAULT_PORT : port;
|
||||
if(port == -1){
|
||||
port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
|
||||
}
|
||||
conf.setInt(FS_FTP_HOST_PORT, port);
|
||||
|
||||
// get user/password information from URI (overrides info in conf)
|
||||
|
@ -340,8 +344,19 @@ public class FTPFileSystem extends FileSystem {
|
|||
// file. The FTP client connection is closed when close() is called on the
|
||||
// FSDataOutputStream.
|
||||
client.changeWorkingDirectory(parent.toUri().getPath());
|
||||
FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
|
||||
.getName()), statistics) {
|
||||
OutputStream outputStream = client.storeFileStream(file.getName());
|
||||
|
||||
if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
|
||||
// The ftpClient is an inconsistent state. Must close the stream
|
||||
// which in turn will logout and disconnect from FTP server
|
||||
if (outputStream != null) {
|
||||
IOUtils.closeStream(outputStream);
|
||||
}
|
||||
disconnect(client);
|
||||
throw new IOException("Unable to create file: " + file + ", Aborting");
|
||||
}
|
||||
|
||||
FSDataOutputStream fos = new FSDataOutputStream(outputStream, statistics) {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
|
@ -356,12 +371,6 @@ public class FTPFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
};
|
||||
if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
|
||||
// The ftpClient is an inconsistent state. Must close the stream
|
||||
// which in turn will logout and disconnect from FTP server
|
||||
fos.close();
|
||||
throw new IOException("Unable to create file: " + file + ", Aborting");
|
||||
}
|
||||
return fos;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.ftp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.ftpserver.FtpServer;
|
||||
import org.apache.ftpserver.FtpServerFactory;
|
||||
import org.apache.ftpserver.ftplet.Authority;
|
||||
import org.apache.ftpserver.ftplet.FtpException;
|
||||
import org.apache.ftpserver.ftplet.UserManager;
|
||||
import org.apache.ftpserver.impl.DefaultFtpServer;
|
||||
import org.apache.ftpserver.listener.Listener;
|
||||
import org.apache.ftpserver.listener.ListenerFactory;
|
||||
import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
|
||||
import org.apache.ftpserver.usermanager.impl.BaseUser;
|
||||
|
||||
/**
|
||||
* Helper class facilitating to manage a local ftp
|
||||
* server for unit tests purposes only.
|
||||
*/
|
||||
public class FtpTestServer {
|
||||
|
||||
private int port;
|
||||
private Path ftpRoot;
|
||||
private UserManager userManager;
|
||||
private FtpServer server;
|
||||
|
||||
public FtpTestServer(Path ftpRoot) {
|
||||
this.ftpRoot = ftpRoot;
|
||||
this.userManager = new PropertiesUserManagerFactory().createUserManager();
|
||||
FtpServerFactory serverFactory = createServerFactory();
|
||||
serverFactory.setUserManager(userManager);
|
||||
this.server = serverFactory.createServer();
|
||||
}
|
||||
|
||||
public FtpTestServer start() throws Exception {
|
||||
server.start();
|
||||
Listener listener = ((DefaultFtpServer) server)
|
||||
.getListeners()
|
||||
.get("default");
|
||||
port = listener.getPort();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Path getFtpRoot() {
|
||||
return ftpRoot;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (!server.isStopped()) {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public BaseUser addUser(String name, String password,
|
||||
Authority... authorities) throws IOException, FtpException {
|
||||
|
||||
BaseUser user = new BaseUser();
|
||||
user.setName(name);
|
||||
user.setPassword(password);
|
||||
Path userHome = Files.createDirectory(ftpRoot.resolve(name));
|
||||
user.setHomeDirectory(userHome.toString());
|
||||
user.setAuthorities(Arrays.asList(authorities));
|
||||
userManager.save(user);
|
||||
return user;
|
||||
}
|
||||
|
||||
private FtpServerFactory createServerFactory() {
|
||||
FtpServerFactory serverFactory = new FtpServerFactory();
|
||||
ListenerFactory defaultListener = new ListenerFactory();
|
||||
defaultListener.setPort(0);
|
||||
serverFactory.addListener("default", defaultListener.createListener());
|
||||
return serverFactory;
|
||||
}
|
||||
}
|
|
@ -17,18 +17,35 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs.ftp;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Comparator;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.net.ftp.FTP;
|
||||
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
import org.apache.commons.net.ftp.FTPFile;
|
||||
import org.apache.ftpserver.usermanager.impl.BaseUser;
|
||||
import org.apache.ftpserver.usermanager.impl.WritePermission;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
|
@ -37,9 +54,72 @@ import static org.junit.Assert.assertEquals;
|
|||
*/
|
||||
public class TestFTPFileSystem {
|
||||
|
||||
private FtpTestServer server;
|
||||
|
||||
@Rule
|
||||
public Timeout testTimeout = new Timeout(180000);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
server = new FtpTestServer(GenericTestUtils.getTestDir().toPath()).start();
|
||||
}
|
||||
|
||||
@After
|
||||
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||
public void tearDown() throws Exception {
|
||||
if (server != null) {
|
||||
server.stop();
|
||||
Files.walk(server.getFtpRoot())
|
||||
.sorted(Comparator.reverseOrder())
|
||||
.map(java.nio.file.Path::toFile)
|
||||
.forEach(File::delete);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateWithWritePermissions() throws Exception {
|
||||
BaseUser user = server.addUser("test", "password", new WritePermission());
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.set("fs.defaultFS", "ftp:///");
|
||||
configuration.set("fs.ftp.host", "localhost");
|
||||
configuration.setInt("fs.ftp.host.port", server.getPort());
|
||||
configuration.set("fs.ftp.user.localhost", user.getName());
|
||||
configuration.set("fs.ftp.password.localhost", user.getPassword());
|
||||
configuration.setBoolean("fs.ftp.impl.disable.cache", true);
|
||||
|
||||
FileSystem fs = FileSystem.get(configuration);
|
||||
byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
|
||||
try (FSDataOutputStream outputStream = fs.create(new Path("test1.txt"))) {
|
||||
outputStream.write(bytesExpected);
|
||||
}
|
||||
try (FSDataInputStream input = fs.open(new Path("test1.txt"))) {
|
||||
assertThat(bytesExpected, equalTo(IOUtils.readFullyToByteArray(input)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateWithoutWritePermissions() throws Exception {
|
||||
BaseUser user = server.addUser("test", "password");
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.set("fs.defaultFS", "ftp:///");
|
||||
configuration.set("fs.ftp.host", "localhost");
|
||||
configuration.setInt("fs.ftp.host.port", server.getPort());
|
||||
configuration.set("fs.ftp.user.localhost", user.getName());
|
||||
configuration.set("fs.ftp.password.localhost", user.getPassword());
|
||||
configuration.setBoolean("fs.ftp.impl.disable.cache", true);
|
||||
|
||||
FileSystem fs = FileSystem.get(configuration);
|
||||
byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
|
||||
LambdaTestUtils.intercept(
|
||||
IOException.class, "Unable to create file: test1.txt, Aborting",
|
||||
() -> {
|
||||
try (FSDataOutputStream out = fs.create(new Path("test1.txt"))) {
|
||||
out.write(bytesExpected);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFTPDefaultPort() throws Exception {
|
||||
FTPFileSystem ftp = new FTPFileSystem();
|
||||
|
|
Loading…
Reference in New Issue