git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@943916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-05-13 15:38:03 +00:00
parent a34c80858e
commit 3af93f1898
4 changed files with 39 additions and 140 deletions

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.Arrays;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -79,10 +80,11 @@ public class FTPBlobUploadStrategy implements BlobUploadStrategy {
url = this.url.toString(); url = this.url.toString();
} }
ftp.storeFile(filename, in); if (!ftp.storeFile(filename, in)) {
throw new JMSException("FTP store failed: " + ftp.getReplyString());
}
ftp.quit(); ftp.quit();
ftp.disconnect(); ftp.disconnect();
return new URL(url + filename); return new URL(url + filename);
} }

View File

@ -24,59 +24,17 @@ import java.net.URL;
import javax.jms.JMSException; import javax.jms.JMSException;
import junit.framework.Assert; import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.ftplet.UserManager;
import org.apache.ftpserver.listener.ListenerFactory;
import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.jmock.Mockery;
public class FTPBlobDownloadStrategyTest extends TestCase { public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
private static final String ftpServerListenerName = "default";
private FtpServer server;
final static String userNamePass = "activemq";
Mockery context = null;
int ftpPort;
String ftpUrl;
final int FILE_SIZE = Short.MAX_VALUE * 10; final int FILE_SIZE = Short.MAX_VALUE * 10;
protected void setUp() throws Exception { public void testDownload() throws Exception {
final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest"); setConnection();
ftpHomeDirFile.mkdirs();
ftpHomeDirFile.getParentFile().deleteOnExit();
FtpServerFactory serverFactory = new FtpServerFactory();
ListenerFactory factory = new ListenerFactory();
PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
UserManager userManager = userManagerFactory.createUserManager();
BaseUser user = new BaseUser();
user.setName("activemq");
user.setPassword("activemq");
user.setHomeDirectory(ftpHomeDirFile.getParent());
userManager.save(user);
serverFactory.setUserManager(userManager);
factory.setPort(0);
serverFactory.addListener(ftpServerListenerName, factory
.createListener());
server = serverFactory.createServer();
server.start();
ftpPort = serverFactory.getListener(ftpServerListenerName)
.getPort();
ftpUrl = "ftp://" + userNamePass + ":" + userNamePass + "@localhost:"
+ ftpPort + "/ftptest/";
// create file
File uploadFile = new File(ftpHomeDirFile, "test.txt"); File uploadFile = new File(ftpHomeDirFile, "test.txt");
FileWriter wrt = new FileWriter(uploadFile); FileWriter wrt = new FileWriter(uploadFile);
@ -88,9 +46,6 @@ public class FTPBlobDownloadStrategyTest extends TestCase {
wrt.close(); wrt.close();
}
public void testDownload() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage(); ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
InputStream stream; InputStream stream;

View File

@ -19,9 +19,7 @@ package org.apache.activemq.blob;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.net.URL;
import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session; import javax.jms.Session;
@ -29,95 +27,14 @@ import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.ftplet.AuthorizationRequest;
import org.apache.ftpserver.ftplet.User;
import org.apache.ftpserver.ftplet.UserManager;
import org.apache.ftpserver.listener.ListenerFactory;
import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
import org.apache.ftpserver.usermanager.UsernamePasswordAuthentication;
import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.api.Invocation;
import org.jmock.lib.action.CustomAction;
public class FTPBlobUploadStrategyTest extends EmbeddedBrokerTestSupport { public class FTPBlobUploadStrategyTest extends FTPTestSupport {
private static final String ftpServerListenerName = "default";
private Connection connection;
private FtpServer server;
final static String userNamePass = "activemq";
Mockery context = null;
String ftpUrl;
protected void setUp() throws Exception {
final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest");
ftpHomeDirFile.mkdirs();
ftpHomeDirFile.getParentFile().deleteOnExit();
FtpServerFactory serverFactory = new FtpServerFactory();
ListenerFactory factory = new ListenerFactory();
PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
UserManager userManager = userManagerFactory.createUserManager();
BaseUser user = new BaseUser();
user.setName("activemq");
user.setPassword("activemq");
user.setHomeDirectory(ftpHomeDirFile.getParent());
userManager.save(user);
serverFactory.setUserManager(userManager);
factory.setPort(0);
serverFactory.addListener(ftpServerListenerName, factory
.createListener());
server = serverFactory.createServer();
server.start();
int ftpPort = serverFactory.getListener(ftpServerListenerName)
.getPort();
ftpUrl = "ftp://"
+ userNamePass
+ ":"
+ userNamePass
+ "@localhost:"
+ ftpPort
+ "/ftptest/";
bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + ftpUrl;
super.setUp();
connection = createConnection();
connection.start();
// check if file exist and delete it
URL url = new URL(ftpUrl);
String connectUrl = url.getHost();
int port = url.getPort() < 1 ? 21 : url.getPort();
FTPClient ftp = new FTPClient();
ftp.connect(connectUrl, port);
if(!ftp.login("activemq", "activemq")) {
ftp.quit();
ftp.disconnect();
throw new JMSException("Cant Authentificate to FTP-Server");
}
ftp.changeWorkingDirectory("ftptest");
ftp.deleteFile("testmessage");
ftp.quit();
ftp.disconnect();
}
public void testFileUpload() throws Exception { public void testFileUpload() throws Exception {
setConnection();
File file = File.createTempFile("amq-data-file-", ".dat"); File file = File.createTempFile("amq-data-file-", ".dat");
// lets write some data // lets write some data
BufferedWriter writer = new BufferedWriter(new FileWriter(file)); BufferedWriter writer = new BufferedWriter(new FileWriter(file));
@ -131,6 +48,31 @@ public class FTPBlobUploadStrategyTest extends EmbeddedBrokerTestSupport {
message.setMessageId(new MessageId("testmessage")); message.setMessageId(new MessageId("testmessage"));
message.onSend(); message.onSend();
Assert.assertEquals(ftpUrl + "testmessage", message.getURL().toString()); Assert.assertEquals(ftpUrl + "testmessage", message.getURL().toString());
File uploaded = new File(ftpHomeDirFile, "testmessage");
assertTrue("File doesn't exists", uploaded.exists());
}
public void testWriteDenied() throws Exception {
userNamePass = "guest";
setConnection();
File file = File.createTempFile("amq-data-file-", ".dat");
// lets write some data
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
writer.append("hello world");
writer.close();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
((ActiveMQConnection)connection).setCopyMessageOnSend(false);
ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file);
message.setMessageId(new MessageId("testmessage"));
try {
message.onSend();
} catch (JMSException e) {
e.printStackTrace();
return;
}
fail("Should have failed with permission denied exception!");
} }
} }

View File

@ -17,7 +17,7 @@ import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.apache.ftpserver.usermanager.impl.WritePermission; import org.apache.ftpserver.usermanager.impl.WritePermission;
import org.jmock.Mockery; import org.jmock.Mockery;
public class FTPTestSupport extends EmbeddedBrokerTestSupport { public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
protected static final String ftpServerListenerName = "default"; protected static final String ftpServerListenerName = "default";
protected Connection connection; protected Connection connection;