From a4384966453337d73f098ad5bb59b0fd11fd5d44 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Thu, 21 Jan 2010 23:02:28 +0000 Subject: [PATCH] fix for https://issues.apache.org/activemq/browse/AMQ-2548 don't close the ftp connection until the stream is closed. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@901911 13f79535-47bb-0310-9956-ffa450edef68 --- .../blob/FTPBlobDownloadStrategy.java | 31 +-- .../blob/FTPBlobDownloadStrategyTest.java | 181 +++++++++--------- 2 files changed, 114 insertions(+), 98 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java index 5490dc5d28..6f31a79d24 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java @@ -18,6 +18,7 @@ package org.apache.activemq.blob; import java.io.IOException; import java.io.InputStream; +import java.io.FilterInputStream; import java.net.ConnectException; import java.net.URL; @@ -35,36 +36,42 @@ public class FTPBlobDownloadStrategy implements BlobDownloadStrategy { public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException { URL url = message.getURL(); - + setUserInformation(url.getUserInfo()); String connectUrl = url.getHost(); int port = url.getPort() < 1 ? 21 : url.getPort(); - FTPClient ftp = new FTPClient(); + final FTPClient ftp = new FTPClient(); try { - ftp.connect(connectUrl, port); + ftp.connect(connectUrl, port); } catch(ConnectException e) { - throw new JMSException("Problem connecting the FTP-server"); + throw new JMSException("Problem connecting the FTP-server"); } - + if(!ftp.login(ftpUser, ftpPass)) { - ftp.quit(); + ftp.quit(); ftp.disconnect(); throw new JMSException("Cant Authentificate to FTP-Server"); } String path = url.getPath(); String workingDir = path.substring(0, path.lastIndexOf("/")); String file = path.substring(path.lastIndexOf("/")+1); - + ftp.changeWorkingDirectory(workingDir); ftp.setFileType(FTPClient.BINARY_FILE_TYPE); - InputStream input = ftp.retrieveFileStream(file); - ftp.quit(); - ftp.disconnect(); - + + InputStream input = new FilterInputStream(ftp.retrieveFileStream(file)) { + + public void close() throws IOException { + in.close(); + ftp.quit(); + ftp.disconnect(); + } + }; + return input; } - + private void setUserInformation(String userInfo) { if(userInfo != null) { String[] userPass = userInfo.split(":"); diff --git a/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java b/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java index a35265eafa..ea1d4fa278 100644 --- a/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java @@ -27,7 +27,6 @@ import junit.framework.Assert; import junit.framework.TestCase; import org.apache.activemq.command.ActiveMQBlobMessage; -import org.apache.commons.net.ftp.FTPClient; import org.apache.ftpserver.FtpServer; import org.apache.ftpserver.FtpServerFactory; import org.apache.ftpserver.ftplet.UserManager; @@ -37,106 +36,116 @@ import org.apache.ftpserver.usermanager.impl.BaseUser; import org.jmock.Mockery; public class FTPBlobDownloadStrategyTest extends TestCase { - + private static final String ftpServerListenerName = "default"; private FtpServer server; final static String userNamePass = "activemq"; - Mockery context = null; - int ftpPort; - String ftpUrl; - - protected void setUp() throws Exception { - final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest"); - ftpHomeDirFile.mkdirs(); - ftpHomeDirFile.getParentFile().deleteOnExit(); + Mockery context = null; + int ftpPort; + String ftpUrl; - FtpServerFactory serverFactory = new FtpServerFactory(); - ListenerFactory factory = new ListenerFactory(); + final int FILE_SIZE = Short.MAX_VALUE * 10; - PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory(); - UserManager userManager = userManagerFactory.createUserManager(); + protected void setUp() throws Exception { + final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest"); + ftpHomeDirFile.mkdirs(); + ftpHomeDirFile.getParentFile().deleteOnExit(); - BaseUser user = new BaseUser(); + FtpServerFactory serverFactory = new FtpServerFactory(); + ListenerFactory factory = new ListenerFactory(); + + PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory(); + UserManager userManager = userManagerFactory.createUserManager(); + + BaseUser user = new BaseUser(); user.setName("activemq"); user.setPassword("activemq"); user.setHomeDirectory(ftpHomeDirFile.getParent()); - + userManager.save(user); - serverFactory.setUserManager(userManager); - factory.setPort(0); - serverFactory.addListener(ftpServerListenerName, factory - .createListener()); - server = serverFactory.createServer(); - server.start(); - ftpPort = serverFactory.getListener(ftpServerListenerName) - .getPort(); + serverFactory.setUserManager(userManager); + factory.setPort(0); + serverFactory.addListener(ftpServerListenerName, factory + .createListener()); + server = serverFactory.createServer(); + server.start(); + ftpPort = serverFactory.getListener(ftpServerListenerName) + .getPort(); - ftpUrl = "ftp://" + userNamePass + ":" + userNamePass + "@localhost:" - + ftpPort + "/ftptest/"; + ftpUrl = "ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + + ftpPort + "/ftptest/"; - File uploadFile = new File(ftpHomeDirFile, "test.txt"); - FileWriter wrt = new FileWriter(uploadFile); - wrt.write("hello world"); - wrt.close(); + File uploadFile = new File(ftpHomeDirFile, "test.txt"); + FileWriter wrt = new FileWriter(uploadFile); + + wrt.write("hello world"); + + for(int ix = 0; ix < FILE_SIZE; ++ix ) { + wrt.write("a"); + } + + wrt.close(); } - - public void testDownload() { - ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); - InputStream stream; - try { - message.setURL(new URL(ftpUrl + "test.txt")); - stream = strategy.getInputStream(message); - int i = stream.read(); - StringBuilder sb = new StringBuilder(10); - while(i != -1) { - sb.append((char)i); - i = stream.read(); - } - Assert.assertEquals("hello world", sb.toString()); - } catch (Exception e) { - e.printStackTrace(); - Assert.assertTrue(false); - } - } - - public void testWrongAuthentification() { - ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); - try { - message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/")); - strategy.getInputStream(message); - } catch(JMSException e) { - Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage()); - return; - } catch(Exception e) { - System.out.println(e); - Assert.assertTrue("Wrong Exception "+ e, false); - return; - } - - Assert.assertTrue("Expect Exception", false); - } - - public void testWrongFTPPort() { - ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); - try { - message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/")); - strategy.getInputStream(message); - } catch(JMSException e) { - Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage()); - return; - } catch(Exception e) { - e.printStackTrace(); - Assert.assertTrue("Wrong Exception "+ e, false); - return; - } - - Assert.assertTrue("Expect Exception", false); - } + + public void testDownload() { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + InputStream stream; + try { + message.setURL(new URL(ftpUrl + "test.txt")); + stream = strategy.getInputStream(message); + int i = stream.read(); + StringBuilder sb = new StringBuilder(2048); + while(i != -1) { + sb.append((char)i); + i = stream.read(); + } + Assert.assertEquals("hello world", sb.toString().substring(0, "hello world".length())); + Assert.assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length()); + + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + } + + public void testWrongAuthentification() { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + try { + message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/")); + strategy.getInputStream(message); + } catch(JMSException e) { + Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage()); + return; + } catch(Exception e) { + System.out.println(e); + Assert.assertTrue("Wrong Exception "+ e, false); + return; + } + + Assert.assertTrue("Expect Exception", false); + } + + public void testWrongFTPPort() { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + try { + message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/")); + strategy.getInputStream(message); + } catch(JMSException e) { + Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage()); + return; + } catch(Exception e) { + e.printStackTrace(); + Assert.assertTrue("Wrong Exception "+ e, false); + return; + } + + Assert.assertTrue("Expect Exception", false); + } }