mirror of
https://github.com/apache/activemq.git
synced 2025-02-17 15:35:36 +00:00
applied the patch from Aleksi Kallio for the DefaultBlobUploadStrategy and DefaultBlobUploadStrategyTest with thanks! For AMQ-1075
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@523332 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a58a92d302
commit
5a521b468a
@ -16,17 +16,18 @@
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
||||
/**
|
||||
* A default implementation of {@link BlobUploadStrategy} which uses the URL class to upload
|
||||
@ -46,33 +47,51 @@ public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
|
||||
public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException {
|
||||
URL url = createUploadURL(message);
|
||||
|
||||
URLConnection connection = url.openConnection();
|
||||
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
||||
connection.setRequestMethod("PUT");
|
||||
connection.setDoOutput(true);
|
||||
|
||||
// use chunked mode or otherwise URLConnection loads everything into memory
|
||||
// (chunked mode not supported before JRE 1.5)
|
||||
connection.setChunkedStreamingMode(transferPolicy.getBufferSize());
|
||||
|
||||
OutputStream os = connection.getOutputStream();
|
||||
|
||||
byte[] buf = new byte[transferPolicy.getBufferSize()];
|
||||
for (int c = fis.read(buf); c != -1; c = fis.read(buf)) {
|
||||
os.write(buf, 0, c);
|
||||
os.write(buf, 0, c);
|
||||
os.flush();
|
||||
}
|
||||
os.close();
|
||||
fis.close();
|
||||
|
||||
/*
|
||||
// Read the response.
|
||||
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
|
||||
String inputLine;
|
||||
while ((inputLine = in.readLine()) != null) {
|
||||
System.out.println(inputLine);
|
||||
|
||||
if (!isSuccessfulCode(connection.getResponseCode())) {
|
||||
throw new IOException("PUT was not successful: "
|
||||
+ connection.getResponseCode() + " " + connection.getResponseMessage());
|
||||
}
|
||||
in.close();
|
||||
*/
|
||||
|
||||
// TODO we now need to ensure that the return code is OK?
|
||||
|
||||
return url;
|
||||
}
|
||||
|
||||
protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException {
|
||||
public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
URL url = createUploadURL(message);
|
||||
|
||||
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
||||
connection.setRequestMethod("DELETE");
|
||||
connection.connect();
|
||||
connection.disconnect();
|
||||
|
||||
if (!isSuccessfulCode(connection.getResponseCode())) {
|
||||
throw new IOException("DELETE was not successful: "
|
||||
+ connection.getResponseCode() + " " + connection.getResponseMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isSuccessfulCode(int responseCode) {
|
||||
return responseCode >= 200 && responseCode < 300; // 2xx => successful
|
||||
}
|
||||
|
||||
protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException {
|
||||
return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,67 @@
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.BufferedWriter;
|
||||
import java.net.URL;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
|
||||
public class DefaultBlobUploadStrategyTest extends TestCase {
|
||||
|
||||
private static final String FILESERVER_URL = "http://localhost:8080/";
|
||||
private static final String URI = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8080/";
|
||||
|
||||
public static void main(String[] args) {
|
||||
junit.textui.TestRunner.run(DefaultBlobUploadStrategyTest.class);
|
||||
}
|
||||
|
||||
public void testDummy() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
public void DISABLED_UNTIL_WE_EMBED_JETTY_testUploadViaDefaultBlobUploadStrategy() throws Exception {
|
||||
// 0. Initialise
|
||||
File file = File.createTempFile("amq-data-file-", ".dat");
|
||||
// lets write some data
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
|
||||
writer.append("Hello World!");
|
||||
writer.close();
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URI);
|
||||
BlobTransferPolicy policy = factory.getBlobTransferPolicy();
|
||||
|
||||
ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
|
||||
msg.setMessageId(new MessageId());
|
||||
|
||||
// 1. Upload
|
||||
DefaultBlobUploadStrategy strategy = new DefaultBlobUploadStrategy(policy);
|
||||
strategy.uploadFile(msg, file);
|
||||
|
||||
// 2. Download
|
||||
msg.setURL(new URL(FILESERVER_URL + msg.getMessageId()));
|
||||
|
||||
InputStream in = msg.getInputStream();
|
||||
long bytesRead = 0;
|
||||
byte[] buffer = new byte[1024*1024];
|
||||
|
||||
while (true) {
|
||||
int c = in.read(buffer);
|
||||
if (c == -1) {
|
||||
break;
|
||||
}
|
||||
bytesRead += c;
|
||||
}
|
||||
in.close();
|
||||
TestCase.assertTrue(bytesRead == file.length());
|
||||
|
||||
// 3. Delete
|
||||
strategy.deleteFile(msg);
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user