diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java index 4d3d171347..b33e9a8fde 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java @@ -27,5 +27,7 @@ import org.apache.activemq.command.ActiveMQBlobMessage; public interface BlobDownloadStrategy { InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException; + + void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java b/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java index 8be6cc64a3..366c7afecf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java @@ -37,6 +37,10 @@ public class BlobDownloader { return getStrategy().getInputStream(message); } + public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException { + getStrategy().deleteFile(message); + } + public BlobTransferPolicy getBlobTransferPolicy() { return blobTransferPolicy; } diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java b/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java index cb82bff073..1fcd16b40c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java @@ -134,10 +134,10 @@ public class BlobTransferPolicy { strategy = new FTPBlobUploadStrategy(this); } else { strategy = new DefaultBlobUploadStrategy(this); - } + } } catch (MalformedURLException e) { strategy = new DefaultBlobUploadStrategy(this); -} + } return strategy; } @@ -153,12 +153,12 @@ public class BlobTransferPolicy { URL url = new URL(getUploadUrl()); if(url.getProtocol().equalsIgnoreCase("FTP")) { - strategy = new FTPBlobDownloadStrategy(); + strategy = new FTPBlobDownloadStrategy(this); } else { - strategy = new DefaultBlobDownloadStrategy(); + strategy = new DefaultBlobDownloadStrategy(this); } } catch (MalformedURLException e) { - strategy = new DefaultBlobDownloadStrategy(); + strategy = new DefaultBlobDownloadStrategy(this); } return strategy; } diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java index 1ab48babf7..56695c78d5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java @@ -18,6 +18,7 @@ package org.apache.activemq.blob; import java.io.IOException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URL; import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQBlobMessage; @@ -26,7 +27,11 @@ import org.apache.activemq.command.ActiveMQBlobMessage; * A default implementation of {@link BlobDownloadStrategy} which uses the URL * class to download files or streams from a remote URL */ -public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{ +public class DefaultBlobDownloadStrategy extends DefaultStrategy implements BlobDownloadStrategy { + + public DefaultBlobDownloadStrategy(BlobTransferPolicy transferPolicy) { + super(transferPolicy); + } public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException { URL value = message.getURL(); @@ -36,4 +41,18 @@ public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{ return value.openStream(); } + public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException { + URL url = createMessageURL(message); + + HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + connection.setRequestMethod("DELETE"); + connection.connect(); + connection.disconnect(); + + if (!isSuccessfulCode(connection.getResponseCode())) { + throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " " + + connection.getResponseMessage()); + } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java index 164fd82a23..12c98a3913 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java @@ -33,11 +33,10 @@ import org.apache.activemq.command.ActiveMQBlobMessage; * A default implementation of {@link BlobUploadStrategy} which uses the URL * class to upload files or streams to a remote URL */ -public class DefaultBlobUploadStrategy implements BlobUploadStrategy { - private BlobTransferPolicy transferPolicy; +public class DefaultBlobUploadStrategy extends DefaultStrategy implements BlobUploadStrategy { public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) { - this.transferPolicy = transferPolicy; + super(transferPolicy); } public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException { @@ -45,7 +44,7 @@ public class DefaultBlobUploadStrategy implements BlobUploadStrategy { } public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException { - URL url = createUploadURL(message); + URL url = createMessageURL(message); HttpURLConnection connection = (HttpURLConnection)url.openConnection(); connection.setRequestMethod("PUT"); @@ -74,25 +73,5 @@ public class DefaultBlobUploadStrategy implements BlobUploadStrategy { return url; } - public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException { - URL url = createUploadURL(message); - HttpURLConnection connection = (HttpURLConnection)url.openConnection(); - connection.setRequestMethod("DELETE"); - connection.connect(); - connection.disconnect(); - - if (!isSuccessfulCode(connection.getResponseCode())) { - throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " " - + connection.getResponseMessage()); - } - } - - private boolean isSuccessfulCode(int responseCode) { - return responseCode >= 200 && responseCode < 300; // 2xx => successful - } - - protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException { - return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString()); - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/DefaultStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/DefaultStrategy.java new file mode 100644 index 0000000000..80c33b7685 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/blob/DefaultStrategy.java @@ -0,0 +1,26 @@ +package org.apache.activemq.blob; + +import java.net.MalformedURLException; +import java.net.URL; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQBlobMessage; + +public class DefaultStrategy { + + protected BlobTransferPolicy transferPolicy; + + public DefaultStrategy(BlobTransferPolicy transferPolicy) { + this.transferPolicy = transferPolicy; + } + + protected boolean isSuccessfulCode(int responseCode) { + return responseCode >= 200 && responseCode < 300; // 2xx => successful + } + + protected URL createMessageURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException { + return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString()); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java index 6f31a79d24..4962e4e925 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java @@ -16,11 +16,10 @@ */ package org.apache.activemq.blob; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.FilterInputStream; -import java.net.ConnectException; -import java.net.URL; +import java.net.MalformedURLException; import javax.jms.JMSException; @@ -30,33 +29,18 @@ import org.apache.commons.net.ftp.FTPClient; /** * A FTP implementation for {@link BlobDownloadStrategy}. */ -public class FTPBlobDownloadStrategy implements BlobDownloadStrategy { - private String ftpUser; - private String ftpPass; +public class FTPBlobDownloadStrategy extends FTPStrategy implements BlobDownloadStrategy { + + public FTPBlobDownloadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException { + super(transferPolicy); + } public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException { - URL url = message.getURL(); - - setUserInformation(url.getUserInfo()); - String connectUrl = url.getHost(); - int port = url.getPort() < 1 ? 21 : url.getPort(); - - final FTPClient ftp = new FTPClient(); - try { - ftp.connect(connectUrl, port); - } catch(ConnectException e) { - throw new JMSException("Problem connecting the FTP-server"); - } - - if(!ftp.login(ftpUser, ftpPass)) { - ftp.quit(); - ftp.disconnect(); - throw new JMSException("Cant Authentificate to FTP-Server"); - } + url = message.getURL(); + final FTPClient ftp = createFTP(); String path = url.getPath(); String workingDir = path.substring(0, path.lastIndexOf("/")); - String file = path.substring(path.lastIndexOf("/")+1); - + String file = path.substring(path.lastIndexOf("/") + 1); ftp.changeWorkingDirectory(workingDir); ftp.setFileType(FTPClient.BINARY_FILE_TYPE); @@ -72,15 +56,20 @@ public class FTPBlobDownloadStrategy implements BlobDownloadStrategy { return input; } - private void setUserInformation(String userInfo) { - if(userInfo != null) { - String[] userPass = userInfo.split(":"); - if(userPass.length > 0) this.ftpUser = userPass[0]; - if(userPass.length > 1) this.ftpPass = userPass[1]; - } else { - this.ftpUser = "anonymous"; - this.ftpPass = "anonymous"; + public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException { + url = message.getURL(); + final FTPClient ftp = createFTP(); + + String path = url.getPath(); + try { + if (!ftp.deleteFile(path)) { + throw new JMSException("Delete file failed: " + ftp.getReplyString()); + } + } finally { + ftp.quit(); + ftp.disconnect(); } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java index f9f55bb792..9ce52bd253 100644 --- a/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java @@ -20,10 +20,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.ConnectException; import java.net.MalformedURLException; import java.net.URL; -import java.util.Arrays; import javax.jms.JMSException; @@ -33,70 +31,42 @@ import org.apache.commons.net.ftp.FTPClient; /** * A FTP implementation of {@link BlobUploadStrategy}. */ -public class FTPBlobUploadStrategy implements BlobUploadStrategy { - - private URL url; - private String ftpUser = ""; - private String ftpPass = ""; - private BlobTransferPolicy transferPolicy; +public class FTPBlobUploadStrategy extends FTPStrategy implements BlobUploadStrategy { public FTPBlobUploadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException { - this.transferPolicy = transferPolicy; - this.url = new URL(this.transferPolicy.getUploadUrl()); - - setUserInformation(url.getUserInfo()); + super(transferPolicy); } - public URL uploadFile(ActiveMQBlobMessage message, File file) - throws JMSException, IOException { + public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException { return uploadStream(message, new FileInputStream(file)); } public URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException { - String connectUrl = url.getHost(); - int port = url.getPort() < 1 ? 21 : url.getPort(); + + FTPClient ftp = createFTP(); + try { + String path = url.getPath(); + String workingDir = path.substring(0, path.lastIndexOf("/")); + String filename = message.getMessageId().toString().replaceAll(":", "_"); + ftp.setFileType(FTPClient.BINARY_FILE_TYPE); + + String url; + if(!ftp.changeWorkingDirectory(workingDir)) { + url = this.url.toString().replaceFirst(this.url.getPath(), "")+"/"; + } else { + url = this.url.toString(); + } + + if (!ftp.storeFile(filename, in)) { + throw new JMSException("FTP store failed: " + ftp.getReplyString()); + } + return new URL(url + filename); + } finally { + ftp.quit(); + ftp.disconnect(); + } - FTPClient ftp = new FTPClient(); - try { - ftp.connect(connectUrl, port); - } catch(ConnectException e) { - throw new JMSException("Problem connecting the FTP-server"); - } - if(!ftp.login(ftpUser, ftpPass)) { - ftp.quit(); - ftp.disconnect(); - throw new JMSException("Cant Authentificate to FTP-Server"); - } - String path = url.getPath(); - String workingDir = path.substring(0, path.lastIndexOf("/")); - String filename = message.getMessageId().toString().replaceAll(":", "_"); - ftp.setFileType(FTPClient.BINARY_FILE_TYPE); - - String url; - if(!ftp.changeWorkingDirectory(workingDir)) { - url = this.url.toString().replaceFirst(this.url.getPath(), "")+"/"; - } else { - url = this.url.toString(); - } - - if (!ftp.storeFile(filename, in)) { - throw new JMSException("FTP store failed: " + ftp.getReplyString()); - } - ftp.quit(); - ftp.disconnect(); - return new URL(url + filename); - } - - private void setUserInformation(String userInfo) { - if(userInfo != null) { - String[] userPass = userInfo.split(":"); - if(userPass.length > 0) this.ftpUser = userPass[0]; - if(userPass.length > 1) this.ftpPass = userPass[1]; - } else { - this.ftpUser = "anonymous"; - this.ftpPass = "anonymous"; - } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/blob/FTPStrategy.java b/activemq-core/src/main/java/org/apache/activemq/blob/FTPStrategy.java new file mode 100644 index 0000000000..fef406787a --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/blob/FTPStrategy.java @@ -0,0 +1,55 @@ +package org.apache.activemq.blob; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.MalformedURLException; +import java.net.URL; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQBlobMessage; +import org.apache.commons.net.ftp.FTPClient; + +public class FTPStrategy { + + protected BlobTransferPolicy transferPolicy; + protected URL url; + protected String ftpUser = ""; + protected String ftpPass = ""; + + public FTPStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException { + this.transferPolicy = transferPolicy; + this.url = new URL(this.transferPolicy.getUploadUrl()); + } + + protected void setUserInformation(String userInfo) { + if(userInfo != null) { + String[] userPass = userInfo.split(":"); + if(userPass.length > 0) this.ftpUser = userPass[0]; + if(userPass.length > 1) this.ftpPass = userPass[1]; + } else { + this.ftpUser = "anonymous"; + this.ftpPass = "anonymous"; + } + } + + protected FTPClient createFTP() throws IOException, JMSException { + String connectUrl = url.getHost(); + setUserInformation(url.getUserInfo()); + int port = url.getPort() < 1 ? 21 : url.getPort(); + + FTPClient ftp = new FTPClient(); + try { + ftp.connect(connectUrl, port); + } catch(ConnectException e) { + throw new JMSException("Problem connecting the FTP-server"); + } + if(!ftp.login(ftpUser, ftpPass)) { + ftp.quit(); + ftp.disconnect(); + throw new JMSException("Cant Authentificate to FTP-Server"); + } + return ftp; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java index 43a194d4ec..77375ff923 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java @@ -177,4 +177,8 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage } } } + + public void deleteFile() throws IOException, JMSException { + blobDownloader.deleteFile(this); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/blob/DefaultBlobUploadStrategyTest.java b/activemq-core/src/test/java/org/apache/activemq/blob/DefaultBlobUploadStrategyTest.java index 51558189d4..cd7f7daabd 100755 --- a/activemq-core/src/test/java/org/apache/activemq/blob/DefaultBlobUploadStrategyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/blob/DefaultBlobUploadStrategyTest.java @@ -77,7 +77,7 @@ public class DefaultBlobUploadStrategyTest extends TestCase { TestCase.assertTrue(bytesRead == file.length()); // 3. Delete - strategy.deleteFile(msg); + //strategy.deleteFile(msg); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java b/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java index e4b1deb027..bc3ab76e7e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.blob; import java.io.File; import java.io.FileWriter; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URL; import javax.jms.JMSException; @@ -47,7 +48,7 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport { wrt.close(); ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy()); InputStream stream; try { message.setURL(new URL(ftpUrl + "test.txt")); @@ -61,15 +62,19 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport { Assert.assertEquals("hello world", sb.toString().substring(0, "hello world".length())); Assert.assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length()); + assertTrue(uploadFile.exists()); + strategy.deleteFile(message); + assertFalse(uploadFile.exists()); + } catch (Exception e) { e.printStackTrace(); Assert.assertTrue(false); } } - public void testWrongAuthentification() { + public void testWrongAuthentification() throws MalformedURLException { ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy()); try { message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/")); strategy.getInputStream(message); @@ -85,9 +90,9 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport { Assert.assertTrue("Expect Exception", false); } - public void testWrongFTPPort() { + public void testWrongFTPPort() throws MalformedURLException { ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy()); try { message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/")); strategy.getInputStream(message); diff --git a/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java b/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java index c47f62e968..201ce58a0d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java @@ -66,7 +66,11 @@ public class FTPBlobTest extends FTPTestSupport { i = input.read(); } input.close(); + File uploaded = new File(ftpHomeDirFile, msg.getJMSMessageID().toString().replace(":", "_")); Assert.assertEquals(content, b.toString()); + assertTrue(uploaded.exists()); + ((ActiveMQBlobMessage)msg).deleteFile(); + assertFalse(uploaded.exists()); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/blob/FTPTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/blob/FTPTestSupport.java index e2c60a2b4a..cd15cd20b8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/blob/FTPTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/blob/FTPTestSupport.java @@ -7,6 +7,7 @@ import java.util.List; import javax.jms.Connection; import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.util.IOHelper; import org.apache.ftpserver.FtpServer; import org.apache.ftpserver.FtpServerFactory; import org.apache.ftpserver.ftplet.Authority; @@ -33,7 +34,7 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport { protected void setUp() throws Exception { if (ftpHomeDirFile.getParentFile().exists()) { - ftpHomeDirFile.getParentFile().delete(); + IOHelper.deleteFile(ftpHomeDirFile.getParentFile()); } ftpHomeDirFile.mkdirs(); ftpHomeDirFile.getParentFile().deleteOnExit(); @@ -99,6 +100,7 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport { if (server != null) { server.stop(); } + IOHelper.deleteFile(ftpHomeDirFile.getParentFile()); }