mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2667 - make fileserver jetty neutral
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@944278 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
af159dd82b
commit
eb6fec879f
|
@ -1,83 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
|
||||
public class DefaultBlobUploadStrategyTest extends TestCase {
|
||||
|
||||
private static final String FILESERVER_URL = "http://localhost:8080/";
|
||||
private static final String URI = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8080/";
|
||||
|
||||
public static void main(String[] args) {
|
||||
junit.textui.TestRunner.run(DefaultBlobUploadStrategyTest.class);
|
||||
}
|
||||
|
||||
public void testDummy() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
// DISABLED UNTIL WE EMBED JETTY
|
||||
public void xtestUploadViaDefaultBlobUploadStrategy() throws Exception {
|
||||
// 0. Initialise
|
||||
File file = File.createTempFile("amq-data-file-", ".dat");
|
||||
// lets write some data
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
|
||||
writer.append("Hello World!");
|
||||
writer.close();
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URI);
|
||||
BlobTransferPolicy policy = factory.getBlobTransferPolicy();
|
||||
|
||||
ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
|
||||
msg.setMessageId(new MessageId());
|
||||
|
||||
// 1. Upload
|
||||
DefaultBlobUploadStrategy strategy = new DefaultBlobUploadStrategy(policy);
|
||||
strategy.uploadFile(msg, file);
|
||||
|
||||
// 2. Download
|
||||
msg.setURL(new URL(FILESERVER_URL + msg.getMessageId()));
|
||||
|
||||
InputStream in = msg.getInputStream();
|
||||
long bytesRead = 0;
|
||||
byte[] buffer = new byte[1024 * 1024];
|
||||
|
||||
while (true) {
|
||||
int c = in.read(buffer);
|
||||
if (c == -1) {
|
||||
break;
|
||||
}
|
||||
bytesRead += c;
|
||||
}
|
||||
in.close();
|
||||
TestCase.assertTrue(bytesRead == file.length());
|
||||
|
||||
// 3. Delete
|
||||
//strategy.deleteFile(msg);
|
||||
}
|
||||
|
||||
}
|
|
@ -87,19 +87,35 @@
|
|||
<artifactId>activemq-jaas</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- web container -->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.aggregate</groupId>
|
||||
<artifactId>jetty-all-server</artifactId>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-servlet_2.5_spec</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- used for testing -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>${pom.groupId}</groupId>
|
||||
<artifactId>activemq-core</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.aggregate</groupId>
|
||||
<artifactId>jetty-all-server</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
|
|
|
@ -38,8 +38,6 @@ import javax.servlet.http.HttpServletResponse;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.URIUtil;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -71,7 +69,7 @@ public class RestFilter implements Filter {
|
|||
}
|
||||
|
||||
private File locateFile(HttpServletRequest request) {
|
||||
return new File(filterConfig.getServletContext().getRealPath(URIUtil.addPaths(request.getServletPath(), request.getPathInfo())));
|
||||
return new File(filterConfig.getServletContext().getRealPath(request.getServletPath()), request.getPathInfo());
|
||||
}
|
||||
|
||||
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
|
||||
|
@ -122,8 +120,8 @@ public class RestFilter implements Filter {
|
|||
|
||||
try {
|
||||
URL destinationUrl = new URL(destination);
|
||||
IO.copyFile(file, new File(destinationUrl.getFile()));
|
||||
IO.delete(file);
|
||||
IOHelper.copyFile(file, new File(destinationUrl.getFile()));
|
||||
IOHelper.deleteFile(file);
|
||||
} catch (IOException e) {
|
||||
response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // file
|
||||
// could
|
||||
|
@ -178,7 +176,7 @@ public class RestFilter implements Filter {
|
|||
|
||||
FileOutputStream out = new FileOutputStream(file);
|
||||
try {
|
||||
IO.copy(request.getInputStream(), out);
|
||||
IOHelper.copyInputStream(request.getInputStream(), out);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception occured" , e);
|
||||
out.close();
|
||||
|
@ -207,7 +205,7 @@ public class RestFilter implements Filter {
|
|||
return;
|
||||
}
|
||||
|
||||
boolean success = IO.delete(file); // actual delete operation
|
||||
boolean success = IOHelper.deleteFile(file); // actual delete operation
|
||||
|
||||
if (success) {
|
||||
response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
package org.apache.activemq.util;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.BlobMessage;
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
|
||||
public class HttpBlobTest extends HttpTestSupport {
|
||||
|
||||
public void testBlobFile() throws Exception {
|
||||
// first create Message
|
||||
File file = File.createTempFile("amq-data-file-", ".dat");
|
||||
// lets write some data
|
||||
String content = "hello world " + System.currentTimeMillis();
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
|
||||
writer.append(content);
|
||||
writer.close();
|
||||
|
||||
ActiveMQSession session = (ActiveMQSession) connection.createSession(
|
||||
false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
BlobMessage message = session.createBlobMessage(file);
|
||||
|
||||
producer.send(message);
|
||||
Thread.sleep(1000);
|
||||
|
||||
// check message send
|
||||
Message msg = consumer.receive(1000);
|
||||
Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
|
||||
|
||||
InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
|
||||
StringBuilder b = new StringBuilder();
|
||||
int i = input.read();
|
||||
while (i != -1) {
|
||||
b.append((char) i);
|
||||
i = input.read();
|
||||
}
|
||||
input.close();
|
||||
File uploaded = new File(homeDir, msg.getJMSMessageID().toString().replace(":", "_"));
|
||||
Assert.assertEquals(content, b.toString());
|
||||
assertTrue(uploaded.exists());
|
||||
((ActiveMQBlobMessage)msg).deleteFile();
|
||||
assertFalse(uploaded.exists());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
package org.apache.activemq.util;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.Socket;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.nio.SelectChannelConnector;
|
||||
import org.eclipse.jetty.webapp.WebAppContext;
|
||||
|
||||
public abstract class HttpTestSupport extends TestCase {
|
||||
private static final Log LOG = LogFactory.getLog(HttpTestSupport.class);
|
||||
|
||||
BrokerService broker;
|
||||
Server server;
|
||||
ActiveMQConnectionFactory factory;
|
||||
Connection connection;
|
||||
Session session;
|
||||
MessageProducer producer;
|
||||
Destination destination;
|
||||
|
||||
protected boolean createBroker = true;
|
||||
|
||||
final File homeDir = new File("src/main/webapp/uploads/");
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
|
||||
server = new Server();
|
||||
SelectChannelConnector connector = new SelectChannelConnector();
|
||||
connector.setPort(8080);
|
||||
connector.setServer(server);
|
||||
WebAppContext context = new WebAppContext();
|
||||
|
||||
context.setResourceBase("src/main/webapp");
|
||||
context.setContextPath("/");
|
||||
context.setServer(server);
|
||||
server.setHandler(context);
|
||||
server.setConnectors(new Connector[] {
|
||||
connector
|
||||
});
|
||||
server.start();
|
||||
waitForJettySocketToAccept("http://localhost:8080");
|
||||
|
||||
if (createBroker) {
|
||||
broker = new BrokerService();
|
||||
broker.setPersistent(false);
|
||||
broker.setUseJmx(true);
|
||||
broker.addConnector("vm://localhost");
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
|
||||
factory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
destination = session.createQueue("test");
|
||||
producer = session.createProducer(destination);
|
||||
|
||||
IOHelper.deleteFile(homeDir);
|
||||
homeDir.mkdir();
|
||||
}
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
server.stop();
|
||||
if (createBroker) {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
session.close();
|
||||
connection.close();
|
||||
IOHelper.deleteFile(homeDir);
|
||||
}
|
||||
}
|
||||
|
||||
public void waitForJettySocketToAccept(String bindLocation) throws Exception {
|
||||
final URL url = new URL(bindLocation);
|
||||
assertTrue("Jetty endpoint is available", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
public boolean isSatisified() throws Exception {
|
||||
boolean canConnect = false;
|
||||
try {
|
||||
Socket socket = SocketFactory.getDefault().createSocket(url.getHost(), url.getPort());
|
||||
socket.close();
|
||||
canConnect = true;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("verify jetty available, failed to connect to " + url + e);
|
||||
}
|
||||
return canConnect;
|
||||
}}, 60 * 1000));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -23,21 +23,17 @@ import java.io.OutputStream;
|
|||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.eclipse.jetty.util.IO;
|
||||
|
||||
public class RestFilterTest extends TestCase {
|
||||
public class RestFilterTest extends HttpTestSupport {
|
||||
|
||||
public RestFilterTest(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
||||
public void test() throws Exception {
|
||||
protected boolean createBroker = false;
|
||||
|
||||
public void testFilter() throws Exception {
|
||||
byte[] fileContents = new byte[] {
|
||||
'a', 'b', 'c'
|
||||
};
|
||||
URL url = new URL("http://localhost:8080/fileserver/repository/file.txt");
|
||||
URL url = new URL("http://localhost:8080/uploads/file.txt");
|
||||
|
||||
// 1. upload
|
||||
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
||||
|
|
Loading…
Reference in New Issue