fix for https://issues.apache.org/activemq/browse/AMQ-2548 don't close the ftp connection until the stream is closed.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@901911 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2010-01-21 23:02:28 +00:00
parent e166ae3937
commit a438496645
2 changed files with 114 additions and 98 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.blob;
import java.io.IOException;
import java.io.InputStream;
import java.io.FilterInputStream;
import java.net.ConnectException;
import java.net.URL;
@ -35,36 +36,42 @@ public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
URL url = message.getURL();
setUserInformation(url.getUserInfo());
String connectUrl = url.getHost();
int port = url.getPort() < 1 ? 21 : url.getPort();
FTPClient ftp = new FTPClient();
final FTPClient ftp = new FTPClient();
try {
ftp.connect(connectUrl, port);
ftp.connect(connectUrl, port);
} catch(ConnectException e) {
throw new JMSException("Problem connecting the FTP-server");
throw new JMSException("Problem connecting the FTP-server");
}
if(!ftp.login(ftpUser, ftpPass)) {
ftp.quit();
ftp.quit();
ftp.disconnect();
throw new JMSException("Cant Authentificate to FTP-Server");
}
String path = url.getPath();
String workingDir = path.substring(0, path.lastIndexOf("/"));
String file = path.substring(path.lastIndexOf("/")+1);
ftp.changeWorkingDirectory(workingDir);
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
InputStream input = ftp.retrieveFileStream(file);
ftp.quit();
ftp.disconnect();
InputStream input = new FilterInputStream(ftp.retrieveFileStream(file)) {
public void close() throws IOException {
in.close();
ftp.quit();
ftp.disconnect();
}
};
return input;
}
private void setUserInformation(String userInfo) {
if(userInfo != null) {
String[] userPass = userInfo.split(":");

View File

@ -27,7 +27,6 @@ import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.ftplet.UserManager;
@ -37,106 +36,116 @@ import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.jmock.Mockery;
public class FTPBlobDownloadStrategyTest extends TestCase {
private static final String ftpServerListenerName = "default";
private FtpServer server;
final static String userNamePass = "activemq";
Mockery context = null;
int ftpPort;
String ftpUrl;
protected void setUp() throws Exception {
final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest");
ftpHomeDirFile.mkdirs();
ftpHomeDirFile.getParentFile().deleteOnExit();
Mockery context = null;
int ftpPort;
String ftpUrl;
FtpServerFactory serverFactory = new FtpServerFactory();
ListenerFactory factory = new ListenerFactory();
final int FILE_SIZE = Short.MAX_VALUE * 10;
PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
UserManager userManager = userManagerFactory.createUserManager();
protected void setUp() throws Exception {
final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest");
ftpHomeDirFile.mkdirs();
ftpHomeDirFile.getParentFile().deleteOnExit();
BaseUser user = new BaseUser();
FtpServerFactory serverFactory = new FtpServerFactory();
ListenerFactory factory = new ListenerFactory();
PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
UserManager userManager = userManagerFactory.createUserManager();
BaseUser user = new BaseUser();
user.setName("activemq");
user.setPassword("activemq");
user.setHomeDirectory(ftpHomeDirFile.getParent());
userManager.save(user);
serverFactory.setUserManager(userManager);
factory.setPort(0);
serverFactory.addListener(ftpServerListenerName, factory
.createListener());
server = serverFactory.createServer();
server.start();
ftpPort = serverFactory.getListener(ftpServerListenerName)
.getPort();
serverFactory.setUserManager(userManager);
factory.setPort(0);
serverFactory.addListener(ftpServerListenerName, factory
.createListener());
server = serverFactory.createServer();
server.start();
ftpPort = serverFactory.getListener(ftpServerListenerName)
.getPort();
ftpUrl = "ftp://" + userNamePass + ":" + userNamePass + "@localhost:"
+ ftpPort + "/ftptest/";
ftpUrl = "ftp://" + userNamePass + ":" + userNamePass + "@localhost:"
+ ftpPort + "/ftptest/";
File uploadFile = new File(ftpHomeDirFile, "test.txt");
FileWriter wrt = new FileWriter(uploadFile);
wrt.write("hello world");
wrt.close();
File uploadFile = new File(ftpHomeDirFile, "test.txt");
FileWriter wrt = new FileWriter(uploadFile);
wrt.write("hello world");
for(int ix = 0; ix < FILE_SIZE; ++ix ) {
wrt.write("a");
}
wrt.close();
}
public void testDownload() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
InputStream stream;
try {
message.setURL(new URL(ftpUrl + "test.txt"));
stream = strategy.getInputStream(message);
int i = stream.read();
StringBuilder sb = new StringBuilder(10);
while(i != -1) {
sb.append((char)i);
i = stream.read();
}
Assert.assertEquals("hello world", sb.toString());
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}
}
public void testWrongAuthentification() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
try {
message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/"));
strategy.getInputStream(message);
} catch(JMSException e) {
Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage());
return;
} catch(Exception e) {
System.out.println(e);
Assert.assertTrue("Wrong Exception "+ e, false);
return;
}
Assert.assertTrue("Expect Exception", false);
}
public void testWrongFTPPort() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
try {
message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/"));
strategy.getInputStream(message);
} catch(JMSException e) {
Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage());
return;
} catch(Exception e) {
e.printStackTrace();
Assert.assertTrue("Wrong Exception "+ e, false);
return;
}
Assert.assertTrue("Expect Exception", false);
}
public void testDownload() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
InputStream stream;
try {
message.setURL(new URL(ftpUrl + "test.txt"));
stream = strategy.getInputStream(message);
int i = stream.read();
StringBuilder sb = new StringBuilder(2048);
while(i != -1) {
sb.append((char)i);
i = stream.read();
}
Assert.assertEquals("hello world", sb.toString().substring(0, "hello world".length()));
Assert.assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length());
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}
}
public void testWrongAuthentification() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
try {
message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/"));
strategy.getInputStream(message);
} catch(JMSException e) {
Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage());
return;
} catch(Exception e) {
System.out.println(e);
Assert.assertTrue("Wrong Exception "+ e, false);
return;
}
Assert.assertTrue("Expect Exception", false);
}
public void testWrongFTPPort() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
try {
message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/"));
strategy.getInputStream(message);
} catch(JMSException e) {
Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage());
return;
} catch(Exception e) {
e.printStackTrace();
Assert.assertTrue("Wrong Exception "+ e, false);
return;
}
Assert.assertTrue("Expect Exception", false);
}
}