git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@800235 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2009-08-03 07:42:43 +00:00
parent 8664315314
commit b08ae50743
14 changed files with 675 additions and 24 deletions

View File

@ -140,6 +140,11 @@
<artifactId>xalan</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<!-- not really a dependency at all - just added optionally to get the generator working -->
@ -460,6 +465,10 @@
<!-- https://issues.apache.org/activemq/browse/AMQ-2050 -->
<exclude>**/ProxyConnectorTest.*</exclude>
<!-- FTPBlob tests need FTP server running -->
<exclude>**/FTPBlob*/</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -16,29 +16,12 @@
*/
package org.apache.activemq;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@ -57,6 +40,24 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
/**
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@ -485,6 +486,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/
private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
}
if (transformer != null) {
Message transformedMessage = transformer.consumerTransform(session, this, m);
if (transformedMessage != null) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq;
import java.net.MalformedURLException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
@ -32,6 +33,9 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
@ -165,6 +169,17 @@ public final class ActiveMQMessageTransformation {
msg.setConnection(connection);
msg.setText(textMsg.getText());
activeMessage = msg;
} else if (message instanceof BlobMessage) {
BlobMessage blobMessage = (BlobMessage)message;
ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
msg.setConnection(connection);
msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
try {
msg.setURL(blobMessage.getURL());
} catch (MalformedURLException e) {
}
activeMessage = msg;
} else {
activeMessage = new ActiveMQMessage();
activeMessage.setConnection(connection);

View File

@ -42,6 +42,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.blob.BlobDownloader;
/**
* <P>
@ -410,6 +411,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
configureMessage(message);
message.setURL(url);
message.setDeletedByBroker(deletedByBroker);
message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
return message;
}
@ -430,6 +432,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
configureMessage(message);
message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
message.setDeletedByBroker(true);
message.setName(file.getName());
return message;
@ -452,6 +455,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
configureMessage(message);
message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
message.setDeletedByBroker(true);
return message;
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.IOException;
import java.io.InputStream;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
/**
* Represents a strategy of downloading a file/stream from some remote
*/
public interface BlobDownloadStrategy {
InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException;
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.IOException;
import java.io.InputStream;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
/**
* Mediator for Blob Download
*/
public class BlobDownloader {
private BlobTransferPolicy blobTransferPolicy;
public BlobDownloader(BlobTransferPolicy transferPolicy) {
this.blobTransferPolicy = transferPolicy;
}
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
return getStrategy().getInputStream(message);
}
public BlobTransferPolicy getBlobTransferPolicy() {
return blobTransferPolicy;
}
public BlobDownloadStrategy getStrategy() {
return getBlobTransferPolicy().getDownloadStrategy();
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.blob;
import java.net.MalformedURLException;
import java.net.URL;
/**
* The policy for configuring how BLOBs (Binary Large OBjects) are transferred
* out of band between producers, brokers and consumers.
@ -28,6 +31,7 @@ public class BlobTransferPolicy {
private String uploadUrl;
private int bufferSize = 128 * 1024;
private BlobUploadStrategy uploadStrategy;
private BlobDownloadStrategy downloadStrategy;
/**
* Returns a copy of this policy object
@ -90,6 +94,13 @@ public class BlobTransferPolicy {
return uploadStrategy;
}
public BlobDownloadStrategy getDownloadStrategy() {
if(downloadStrategy == null) {
downloadStrategy = createDownloadStrategy();
}
return downloadStrategy;
}
/**
* Sets the upload strategy to use for uploading BLOBs to some URL
*/
@ -108,7 +119,49 @@ public class BlobTransferPolicy {
this.bufferSize = bufferSize;
}
/**
* Returns the upload strategy depending on the information from the
* uploadURL. Currently supportet HTTP and FTP
*
* @return
*/
protected BlobUploadStrategy createUploadStrategy() {
return new DefaultBlobUploadStrategy(this);
BlobUploadStrategy strategy;
try {
URL url = new URL(getUploadUrl());
if(url.getProtocol().equalsIgnoreCase("FTP")) {
strategy = new FTPBlobUploadStrategy(this);
} else {
strategy = new DefaultBlobUploadStrategy(this);
}
} catch (MalformedURLException e) {
strategy = new DefaultBlobUploadStrategy(this);
}
return strategy;
}
/**
* Returns the download strategy depending on the information from the
* uploadURL. Currently supportet HTTP and FTP
*
* @return
*/
protected BlobDownloadStrategy createDownloadStrategy() {
BlobDownloadStrategy strategy;
try {
URL url = new URL(getUploadUrl());
if(url.getProtocol().equalsIgnoreCase("FTP")) {
strategy = new FTPBlobDownloadStrategy();
} else {
strategy = new DefaultBlobDownloadStrategy();
}
} catch (MalformedURLException e) {
strategy = new DefaultBlobDownloadStrategy();
}
return strategy;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
/**
* A default implementation of {@link BlobDownloadStrategy} which uses the URL
* class to download files or streams from a remote URL
*/
public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
URL value = message.getURL();
if (value == null) {
return null;
}
return value.openStream();
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.URL;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.commons.net.ftp.FTPClient;
/**
* A FTP implementation for {@link BlobDownloadStrategy}.
*/
public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
private String ftpUser;
private String ftpPass;
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
URL url = message.getURL();
setUserInformation(url.getUserInfo());
String connectUrl = url.getHost();
int port = url.getPort() < 1 ? 21 : url.getPort();
FTPClient ftp = new FTPClient();
try {
ftp.connect(connectUrl, port);
} catch(ConnectException e) {
throw new JMSException("Problem connecting the FTP-server");
}
if(!ftp.login(ftpUser, ftpPass)) {
ftp.quit();
ftp.disconnect();
throw new JMSException("Cant Authentificate to FTP-Server");
}
String path = url.getPath();
String workingDir = path.substring(0, path.lastIndexOf("/"));
String file = path.substring(path.lastIndexOf("/")+1);
ftp.changeWorkingDirectory(workingDir);
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
InputStream input = ftp.retrieveFileStream(file);
ftp.quit();
ftp.disconnect();
return input;
}
private void setUserInformation(String userInfo) {
if(userInfo != null) {
String[] userPass = userInfo.split(":");
if(userPass.length > 0) this.ftpUser = userPass[0];
if(userPass.length > 1) this.ftpPass = userPass[1];
} else {
this.ftpUser = "anonymous";
this.ftpPass = "anonymous";
}
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.commons.net.ftp.FTPClient;
/**
* A FTP implementation of {@link BlobUploadStrategy}.
*/
public class FTPBlobUploadStrategy implements BlobUploadStrategy {
private URL url;
private String ftpUser = "";
private String ftpPass = "";
private BlobTransferPolicy transferPolicy;
public FTPBlobUploadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
this.transferPolicy = transferPolicy;
this.url = new URL(this.transferPolicy.getUploadUrl());
setUserInformation(url.getUserInfo());
}
public URL uploadFile(ActiveMQBlobMessage message, File file)
throws JMSException, IOException {
return uploadStream(message, new FileInputStream(file));
}
public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
throws JMSException, IOException {
String connectUrl = url.getHost();
int port = url.getPort() < 1 ? 21 : url.getPort();
FTPClient ftp = new FTPClient();
try {
ftp.connect(connectUrl, port);
} catch(ConnectException e) {
throw new JMSException("Problem connecting the FTP-server");
}
if(!ftp.login(ftpUser, ftpPass)) {
ftp.quit();
ftp.disconnect();
throw new JMSException("Cant Authentificate to FTP-Server");
}
String path = url.getPath();
String workingDir = path.substring(0, path.lastIndexOf("/"));
String filename = message.getMessageId().toString().replaceAll(":", "_");
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
String url;
if(!ftp.changeWorkingDirectory(workingDir)) {
url = this.url.toString().replaceFirst(this.url.getPath(), "")+"/";
} else {
url = this.url.toString();
}
ftp.storeFile(filename, in);
ftp.quit();
ftp.disconnect();
return new URL(url + filename);
}
private void setUserInformation(String userInfo) {
if(userInfo != null) {
String[] userPass = userInfo.split(":");
if(userPass.length > 0) this.ftpUser = userPass[0];
if(userPass.length > 1) this.ftpPass = userPass[1];
} else {
this.ftpUser = "anonymous";
this.ftpPass = "anonymous";
}
}
}

View File

@ -24,6 +24,7 @@ import java.net.URL;
import javax.jms.JMSException;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.util.JMSExceptionSupport;
@ -44,6 +45,7 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
private boolean deletedByBroker;
private transient BlobUploader blobUploader;
private transient BlobDownloader blobDownloader;
private transient URL url;
public Message copy() {
@ -123,11 +125,10 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
}
public InputStream getInputStream() throws IOException, JMSException {
URL value = getURL();
if (value == null) {
if(blobDownloader == null) {
return null;
}
return value.openStream();
return blobDownloader.getInputStream(this);
}
public URL getURL() throws JMSException {
@ -154,6 +155,14 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
this.blobUploader = blobUploader;
}
public BlobDownloader getBlobDownloader() {
return blobDownloader;
}
public void setBlobDownloader(BlobDownloader blobDownloader) {
this.blobDownloader = blobDownloader;
}
public void onSend() throws JMSException {
super.onSend();

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.InputStream;
import java.net.URL;
import javax.jms.JMSException;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQBlobMessage;
/**
* To start this test make sure an ftp server is running with
* user: activemq and password: activemq.
* Also a file called test.txt with the content <b>hello world</b> must be in the ftptest directory.
*/
public class FTPBlobDownloadStrategyTest extends TestCase {
public void xtestDownload() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
InputStream stream;
try {
message.setURL(new URL("ftp://activemq:activemq@localhost/ftptest/test.txt"));
stream = strategy.getInputStream(message);
int i = stream.read();
StringBuilder sb = new StringBuilder(10);
while(i != -1) {
sb.append((char)i);
i = stream.read();
}
Assert.assertEquals("hello world", sb.toString());
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}
}
public void xtestWrongAuthentification() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
try {
message.setURL(new URL("ftp://activemq:activemq_wrong@localhost/ftptest/test.txt"));
strategy.getInputStream(message);
} catch(JMSException e) {
Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage());
return;
} catch(Exception e) {
System.out.println(e);
Assert.assertTrue("Wrong Exception "+ e, false);
return;
}
Assert.assertTrue("Expect Exception", false);
}
public void xtestWrongFTPPort() {
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
try {
message.setURL(new URL("ftp://activemq:activemq@localhost:442/ftptest/test.txt"));
strategy.getInputStream(message);
} catch(JMSException e) {
Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage());
return;
} catch(Exception e) {
e.printStackTrace();
Assert.assertTrue("Wrong Exception "+ e, false);
return;
}
Assert.assertTrue("Expect Exception", false);
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStream;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQBlobMessage;
/**
* To start this test make sure an ftp server is running with
* user: activemq and password: activemq
*/
public class FTPBlobTest extends EmbeddedBrokerTestSupport {
private ActiveMQConnection connection;
protected void setUp() throws Exception {
bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activemq@localhost/ftptest/";
super.setUp();
connection = (ActiveMQConnection) createConnection();
connection.start();
}
public void testBlobFile() throws Exception {
// first create Message
File file = File.createTempFile("amq-data-file-", ".dat");
// lets write some data
String content = "hello world "+ System.currentTimeMillis();
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
writer.append(content);
writer.close();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
BlobMessage message = session.createBlobMessage(file);
producer.send(message);
Thread.sleep(1000);
// check message send
Message msg = consumer.receive(1000);
Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
StringBuilder b = new StringBuilder();
int i = input.read();
while(i != -1) {
b.append((char) i);
i = input.read();
}
Assert.assertEquals(content, b.toString());
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.blob;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.net.URL;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.MessageId;
import org.apache.commons.net.ftp.FTPClient;
/**
* To start this test make sure an ftp server is running with
* user: activemq and password: activemq
*/
public class FTPBlobUploadStrategyTest extends EmbeddedBrokerTestSupport {
private Connection connection;
protected void setUp() throws Exception {
bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activemq@localhost/ftptest/";
super.setUp();
connection = createConnection();
connection.start();
// check if file exist and delete it
URL url = new URL("ftp://activemq:activemq@localhost/ftptest/");
String connectUrl = url.getHost();
int port = url.getPort() < 1 ? 21 : url.getPort();
FTPClient ftp = new FTPClient();
ftp.connect(connectUrl, port);
if(!ftp.login("activemq", "activemq")) {
ftp.quit();
ftp.disconnect();
throw new JMSException("Cant Authentificate to FTP-Server");
}
ftp.changeWorkingDirectory("ftptest");
ftp.deleteFile("testmessage");
ftp.quit();
ftp.disconnect();
}
public void testFileUpload() throws Exception {
File file = File.createTempFile("amq-data-file-", ".dat");
// lets write some data
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
writer.append("hello world");
writer.close();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
((ActiveMQConnection)connection).setCopyMessageOnSend(false);
ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file);
message.setMessageId(new MessageId("testmessage"));
message.onSend();
Assert.assertEquals("ftp://activemq:activemq@localhost/ftptest/testmessage", message.getURL().toString());
}
}