mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2713 - delete blob message files from server
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@944167 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dee584b99e
commit
af159dd82b
|
@ -27,5 +27,7 @@ import org.apache.activemq.command.ActiveMQBlobMessage;
|
|||
public interface BlobDownloadStrategy {
|
||||
|
||||
InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException;
|
||||
|
||||
void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException;
|
||||
|
||||
}
|
||||
|
|
|
@ -37,6 +37,10 @@ public class BlobDownloader {
|
|||
return getStrategy().getInputStream(message);
|
||||
}
|
||||
|
||||
public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
getStrategy().deleteFile(message);
|
||||
}
|
||||
|
||||
public BlobTransferPolicy getBlobTransferPolicy() {
|
||||
return blobTransferPolicy;
|
||||
}
|
||||
|
|
|
@ -134,10 +134,10 @@ public class BlobTransferPolicy {
|
|||
strategy = new FTPBlobUploadStrategy(this);
|
||||
} else {
|
||||
strategy = new DefaultBlobUploadStrategy(this);
|
||||
}
|
||||
}
|
||||
} catch (MalformedURLException e) {
|
||||
strategy = new DefaultBlobUploadStrategy(this);
|
||||
}
|
||||
}
|
||||
return strategy;
|
||||
}
|
||||
|
||||
|
@ -153,12 +153,12 @@ public class BlobTransferPolicy {
|
|||
URL url = new URL(getUploadUrl());
|
||||
|
||||
if(url.getProtocol().equalsIgnoreCase("FTP")) {
|
||||
strategy = new FTPBlobDownloadStrategy();
|
||||
strategy = new FTPBlobDownloadStrategy(this);
|
||||
} else {
|
||||
strategy = new DefaultBlobDownloadStrategy();
|
||||
strategy = new DefaultBlobDownloadStrategy(this);
|
||||
}
|
||||
} catch (MalformedURLException e) {
|
||||
strategy = new DefaultBlobDownloadStrategy();
|
||||
strategy = new DefaultBlobDownloadStrategy(this);
|
||||
}
|
||||
return strategy;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.blob;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import javax.jms.JMSException;
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
@ -26,7 +27,11 @@ import org.apache.activemq.command.ActiveMQBlobMessage;
|
|||
* A default implementation of {@link BlobDownloadStrategy} which uses the URL
|
||||
* class to download files or streams from a remote URL
|
||||
*/
|
||||
public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{
|
||||
public class DefaultBlobDownloadStrategy extends DefaultStrategy implements BlobDownloadStrategy {
|
||||
|
||||
public DefaultBlobDownloadStrategy(BlobTransferPolicy transferPolicy) {
|
||||
super(transferPolicy);
|
||||
}
|
||||
|
||||
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
URL value = message.getURL();
|
||||
|
@ -36,4 +41,18 @@ public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{
|
|||
return value.openStream();
|
||||
}
|
||||
|
||||
public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
URL url = createMessageURL(message);
|
||||
|
||||
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
||||
connection.setRequestMethod("DELETE");
|
||||
connection.connect();
|
||||
connection.disconnect();
|
||||
|
||||
if (!isSuccessfulCode(connection.getResponseCode())) {
|
||||
throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " "
|
||||
+ connection.getResponseMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,11 +33,10 @@ import org.apache.activemq.command.ActiveMQBlobMessage;
|
|||
* A default implementation of {@link BlobUploadStrategy} which uses the URL
|
||||
* class to upload files or streams to a remote URL
|
||||
*/
|
||||
public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
|
||||
private BlobTransferPolicy transferPolicy;
|
||||
public class DefaultBlobUploadStrategy extends DefaultStrategy implements BlobUploadStrategy {
|
||||
|
||||
public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) {
|
||||
this.transferPolicy = transferPolicy;
|
||||
super(transferPolicy);
|
||||
}
|
||||
|
||||
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
|
||||
|
@ -45,7 +44,7 @@ public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
|
|||
}
|
||||
|
||||
public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException {
|
||||
URL url = createUploadURL(message);
|
||||
URL url = createMessageURL(message);
|
||||
|
||||
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
||||
connection.setRequestMethod("PUT");
|
||||
|
@ -74,25 +73,5 @@ public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
|
|||
return url;
|
||||
}
|
||||
|
||||
public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
URL url = createUploadURL(message);
|
||||
|
||||
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
||||
connection.setRequestMethod("DELETE");
|
||||
connection.connect();
|
||||
connection.disconnect();
|
||||
|
||||
if (!isSuccessfulCode(connection.getResponseCode())) {
|
||||
throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " "
|
||||
+ connection.getResponseMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isSuccessfulCode(int responseCode) {
|
||||
return responseCode >= 200 && responseCode < 300; // 2xx => successful
|
||||
}
|
||||
|
||||
protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException {
|
||||
return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package org.apache.activemq.blob;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
||||
public class DefaultStrategy {
|
||||
|
||||
protected BlobTransferPolicy transferPolicy;
|
||||
|
||||
public DefaultStrategy(BlobTransferPolicy transferPolicy) {
|
||||
this.transferPolicy = transferPolicy;
|
||||
}
|
||||
|
||||
protected boolean isSuccessfulCode(int responseCode) {
|
||||
return responseCode >= 200 && responseCode < 300; // 2xx => successful
|
||||
}
|
||||
|
||||
protected URL createMessageURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException {
|
||||
return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
|
||||
}
|
||||
|
||||
}
|
|
@ -16,11 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.FilterInputStream;
|
||||
import java.net.ConnectException;
|
||||
import java.net.URL;
|
||||
import java.net.MalformedURLException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
|
@ -30,33 +29,18 @@ import org.apache.commons.net.ftp.FTPClient;
|
|||
/**
|
||||
* A FTP implementation for {@link BlobDownloadStrategy}.
|
||||
*/
|
||||
public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
|
||||
private String ftpUser;
|
||||
private String ftpPass;
|
||||
public class FTPBlobDownloadStrategy extends FTPStrategy implements BlobDownloadStrategy {
|
||||
|
||||
public FTPBlobDownloadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
|
||||
super(transferPolicy);
|
||||
}
|
||||
|
||||
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
URL url = message.getURL();
|
||||
|
||||
setUserInformation(url.getUserInfo());
|
||||
String connectUrl = url.getHost();
|
||||
int port = url.getPort() < 1 ? 21 : url.getPort();
|
||||
|
||||
final FTPClient ftp = new FTPClient();
|
||||
try {
|
||||
ftp.connect(connectUrl, port);
|
||||
} catch(ConnectException e) {
|
||||
throw new JMSException("Problem connecting the FTP-server");
|
||||
}
|
||||
|
||||
if(!ftp.login(ftpUser, ftpPass)) {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
throw new JMSException("Cant Authentificate to FTP-Server");
|
||||
}
|
||||
url = message.getURL();
|
||||
final FTPClient ftp = createFTP();
|
||||
String path = url.getPath();
|
||||
String workingDir = path.substring(0, path.lastIndexOf("/"));
|
||||
String file = path.substring(path.lastIndexOf("/")+1);
|
||||
|
||||
String file = path.substring(path.lastIndexOf("/") + 1);
|
||||
ftp.changeWorkingDirectory(workingDir);
|
||||
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
|
||||
|
||||
|
@ -72,15 +56,20 @@ public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
|
|||
return input;
|
||||
}
|
||||
|
||||
private void setUserInformation(String userInfo) {
|
||||
if(userInfo != null) {
|
||||
String[] userPass = userInfo.split(":");
|
||||
if(userPass.length > 0) this.ftpUser = userPass[0];
|
||||
if(userPass.length > 1) this.ftpPass = userPass[1];
|
||||
} else {
|
||||
this.ftpUser = "anonymous";
|
||||
this.ftpPass = "anonymous";
|
||||
public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
url = message.getURL();
|
||||
final FTPClient ftp = createFTP();
|
||||
|
||||
String path = url.getPath();
|
||||
try {
|
||||
if (!ftp.deleteFile(path)) {
|
||||
throw new JMSException("Delete file failed: " + ftp.getReplyString());
|
||||
}
|
||||
} finally {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,10 +20,8 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.ConnectException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
|
@ -33,70 +31,42 @@ import org.apache.commons.net.ftp.FTPClient;
|
|||
/**
|
||||
* A FTP implementation of {@link BlobUploadStrategy}.
|
||||
*/
|
||||
public class FTPBlobUploadStrategy implements BlobUploadStrategy {
|
||||
|
||||
private URL url;
|
||||
private String ftpUser = "";
|
||||
private String ftpPass = "";
|
||||
private BlobTransferPolicy transferPolicy;
|
||||
public class FTPBlobUploadStrategy extends FTPStrategy implements BlobUploadStrategy {
|
||||
|
||||
public FTPBlobUploadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
|
||||
this.transferPolicy = transferPolicy;
|
||||
this.url = new URL(this.transferPolicy.getUploadUrl());
|
||||
|
||||
setUserInformation(url.getUserInfo());
|
||||
super(transferPolicy);
|
||||
}
|
||||
|
||||
public URL uploadFile(ActiveMQBlobMessage message, File file)
|
||||
throws JMSException, IOException {
|
||||
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
|
||||
return uploadStream(message, new FileInputStream(file));
|
||||
}
|
||||
|
||||
public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
|
||||
throws JMSException, IOException {
|
||||
String connectUrl = url.getHost();
|
||||
int port = url.getPort() < 1 ? 21 : url.getPort();
|
||||
|
||||
FTPClient ftp = createFTP();
|
||||
try {
|
||||
String path = url.getPath();
|
||||
String workingDir = path.substring(0, path.lastIndexOf("/"));
|
||||
String filename = message.getMessageId().toString().replaceAll(":", "_");
|
||||
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
|
||||
|
||||
String url;
|
||||
if(!ftp.changeWorkingDirectory(workingDir)) {
|
||||
url = this.url.toString().replaceFirst(this.url.getPath(), "")+"/";
|
||||
} else {
|
||||
url = this.url.toString();
|
||||
}
|
||||
|
||||
if (!ftp.storeFile(filename, in)) {
|
||||
throw new JMSException("FTP store failed: " + ftp.getReplyString());
|
||||
}
|
||||
return new URL(url + filename);
|
||||
} finally {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
}
|
||||
|
||||
FTPClient ftp = new FTPClient();
|
||||
try {
|
||||
ftp.connect(connectUrl, port);
|
||||
} catch(ConnectException e) {
|
||||
throw new JMSException("Problem connecting the FTP-server");
|
||||
}
|
||||
if(!ftp.login(ftpUser, ftpPass)) {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
throw new JMSException("Cant Authentificate to FTP-Server");
|
||||
}
|
||||
String path = url.getPath();
|
||||
String workingDir = path.substring(0, path.lastIndexOf("/"));
|
||||
String filename = message.getMessageId().toString().replaceAll(":", "_");
|
||||
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
|
||||
|
||||
String url;
|
||||
if(!ftp.changeWorkingDirectory(workingDir)) {
|
||||
url = this.url.toString().replaceFirst(this.url.getPath(), "")+"/";
|
||||
} else {
|
||||
url = this.url.toString();
|
||||
}
|
||||
|
||||
if (!ftp.storeFile(filename, in)) {
|
||||
throw new JMSException("FTP store failed: " + ftp.getReplyString());
|
||||
}
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
return new URL(url + filename);
|
||||
}
|
||||
|
||||
private void setUserInformation(String userInfo) {
|
||||
if(userInfo != null) {
|
||||
String[] userPass = userInfo.split(":");
|
||||
if(userPass.length > 0) this.ftpUser = userPass[0];
|
||||
if(userPass.length > 1) this.ftpPass = userPass[1];
|
||||
} else {
|
||||
this.ftpUser = "anonymous";
|
||||
this.ftpPass = "anonymous";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
|
||||
public class FTPStrategy {
|
||||
|
||||
protected BlobTransferPolicy transferPolicy;
|
||||
protected URL url;
|
||||
protected String ftpUser = "";
|
||||
protected String ftpPass = "";
|
||||
|
||||
public FTPStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
|
||||
this.transferPolicy = transferPolicy;
|
||||
this.url = new URL(this.transferPolicy.getUploadUrl());
|
||||
}
|
||||
|
||||
protected void setUserInformation(String userInfo) {
|
||||
if(userInfo != null) {
|
||||
String[] userPass = userInfo.split(":");
|
||||
if(userPass.length > 0) this.ftpUser = userPass[0];
|
||||
if(userPass.length > 1) this.ftpPass = userPass[1];
|
||||
} else {
|
||||
this.ftpUser = "anonymous";
|
||||
this.ftpPass = "anonymous";
|
||||
}
|
||||
}
|
||||
|
||||
protected FTPClient createFTP() throws IOException, JMSException {
|
||||
String connectUrl = url.getHost();
|
||||
setUserInformation(url.getUserInfo());
|
||||
int port = url.getPort() < 1 ? 21 : url.getPort();
|
||||
|
||||
FTPClient ftp = new FTPClient();
|
||||
try {
|
||||
ftp.connect(connectUrl, port);
|
||||
} catch(ConnectException e) {
|
||||
throw new JMSException("Problem connecting the FTP-server");
|
||||
}
|
||||
if(!ftp.login(ftpUser, ftpPass)) {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
throw new JMSException("Cant Authentificate to FTP-Server");
|
||||
}
|
||||
return ftp;
|
||||
}
|
||||
|
||||
}
|
|
@ -177,4 +177,8 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteFile() throws IOException, JMSException {
|
||||
blobDownloader.deleteFile(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class DefaultBlobUploadStrategyTest extends TestCase {
|
|||
TestCase.assertTrue(bytesRead == file.length());
|
||||
|
||||
// 3. Delete
|
||||
strategy.deleteFile(msg);
|
||||
//strategy.deleteFile(msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.blob;
|
|||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
@ -47,7 +48,7 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
|
|||
wrt.close();
|
||||
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
|
||||
InputStream stream;
|
||||
try {
|
||||
message.setURL(new URL(ftpUrl + "test.txt"));
|
||||
|
@ -61,15 +62,19 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
|
|||
Assert.assertEquals("hello world", sb.toString().substring(0, "hello world".length()));
|
||||
Assert.assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length());
|
||||
|
||||
assertTrue(uploadFile.exists());
|
||||
strategy.deleteFile(message);
|
||||
assertFalse(uploadFile.exists());
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
||||
public void testWrongAuthentification() {
|
||||
public void testWrongAuthentification() throws MalformedURLException {
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
|
||||
try {
|
||||
message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/"));
|
||||
strategy.getInputStream(message);
|
||||
|
@ -85,9 +90,9 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
|
|||
Assert.assertTrue("Expect Exception", false);
|
||||
}
|
||||
|
||||
public void testWrongFTPPort() {
|
||||
public void testWrongFTPPort() throws MalformedURLException {
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
|
||||
try {
|
||||
message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/"));
|
||||
strategy.getInputStream(message);
|
||||
|
|
|
@ -66,7 +66,11 @@ public class FTPBlobTest extends FTPTestSupport {
|
|||
i = input.read();
|
||||
}
|
||||
input.close();
|
||||
File uploaded = new File(ftpHomeDirFile, msg.getJMSMessageID().toString().replace(":", "_"));
|
||||
Assert.assertEquals(content, b.toString());
|
||||
assertTrue(uploaded.exists());
|
||||
((ActiveMQBlobMessage)msg).deleteFile();
|
||||
assertFalse(uploaded.exists());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import java.util.List;
|
|||
import javax.jms.Connection;
|
||||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.ftpserver.FtpServer;
|
||||
import org.apache.ftpserver.FtpServerFactory;
|
||||
import org.apache.ftpserver.ftplet.Authority;
|
||||
|
@ -33,7 +34,7 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
|
|||
protected void setUp() throws Exception {
|
||||
|
||||
if (ftpHomeDirFile.getParentFile().exists()) {
|
||||
ftpHomeDirFile.getParentFile().delete();
|
||||
IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
|
||||
}
|
||||
ftpHomeDirFile.mkdirs();
|
||||
ftpHomeDirFile.getParentFile().deleteOnExit();
|
||||
|
@ -99,6 +100,7 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
|
|||
if (server != null) {
|
||||
server.stop();
|
||||
}
|
||||
IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue