mirror of https://github.com/apache/activemq.git
[AMQ-8341] Remove FTP Blob strategy
- Remove commons-net dependency
This commit is contained in:
parent
0585a2998d
commit
2e784d07be
|
@ -58,13 +58,6 @@
|
|||
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- for ftp blob upload/download -->
|
||||
<dependency>
|
||||
<groupId>commons-net</groupId>
|
||||
<artifactId>commons-net</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<!-- for zerconf discovery -->
|
||||
<dependency>
|
||||
<groupId>javax.jmdns</groupId>
|
||||
|
|
|
@ -124,7 +124,7 @@ public class BlobTransferPolicy {
|
|||
|
||||
/**
|
||||
* Returns the upload strategy depending on the information from the
|
||||
* uploadURL. Currently supportet HTTP and FTP
|
||||
* uploadURL. Currently supportet HTTP
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
|
@ -133,9 +133,7 @@ public class BlobTransferPolicy {
|
|||
try {
|
||||
URL url = new URL(getUploadUrl());
|
||||
|
||||
if(url.getProtocol().equalsIgnoreCase("FTP")) {
|
||||
strategy = new FTPBlobUploadStrategy(this);
|
||||
} else if (url.getProtocol().equalsIgnoreCase("FILE")) {
|
||||
if (url.getProtocol().equalsIgnoreCase("FILE")) {
|
||||
strategy = new FileSystemBlobStrategy(this);
|
||||
} else {
|
||||
strategy = new DefaultBlobUploadStrategy(this);
|
||||
|
@ -150,7 +148,7 @@ public class BlobTransferPolicy {
|
|||
|
||||
/**
|
||||
* Returns the download strategy depending on the information from the
|
||||
* uploadURL. Currently supportet HTTP and FTP
|
||||
* uploadURL. Currently supportet HTTP
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
|
@ -159,9 +157,7 @@ public class BlobTransferPolicy {
|
|||
try {
|
||||
URL url = new URL(getUploadUrl());
|
||||
|
||||
if(url.getProtocol().equalsIgnoreCase("FTP")) {
|
||||
strategy = new FTPBlobDownloadStrategy(this);
|
||||
} else if (url.getProtocol().equalsIgnoreCase("FILE")) {
|
||||
if (url.getProtocol().equalsIgnoreCase("FILE")) {
|
||||
strategy = new FileSystemBlobStrategy(this);
|
||||
} else {
|
||||
strategy = new DefaultBlobDownloadStrategy(this);
|
||||
|
|
|
@ -1,93 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
|
||||
/**
|
||||
* A FTP implementation for {@link BlobDownloadStrategy}.
|
||||
*/
|
||||
public class FTPBlobDownloadStrategy extends FTPStrategy implements BlobDownloadStrategy {
|
||||
|
||||
public FTPBlobDownloadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
|
||||
super(transferPolicy);
|
||||
}
|
||||
|
||||
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
// Do some checks on the received URL against the transfer policy
|
||||
URL uploadURL = new URL(super.transferPolicy.getUploadUrl());
|
||||
String protocol = message.getURL().getProtocol();
|
||||
if (!protocol.equals(uploadURL.getProtocol())) {
|
||||
throw new IOException("The message URL protocol is incorrect");
|
||||
}
|
||||
|
||||
String host = message.getURL().getHost();
|
||||
if (!host.equals(uploadURL.getHost())) {
|
||||
throw new IOException("The message URL host is incorrect");
|
||||
}
|
||||
|
||||
int port = message.getURL().getPort();
|
||||
if (uploadURL.getPort() != 0 && port != uploadURL.getPort()) {
|
||||
throw new IOException("The message URL port is incorrect");
|
||||
}
|
||||
|
||||
url = message.getURL();
|
||||
final FTPClient ftp = createFTP();
|
||||
String path = url.getPath();
|
||||
String workingDir = path.substring(0, path.lastIndexOf("/"));
|
||||
String file = path.substring(path.lastIndexOf("/") + 1);
|
||||
ftp.changeWorkingDirectory(workingDir);
|
||||
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
|
||||
|
||||
InputStream input = new FilterInputStream(ftp.retrieveFileStream(file)) {
|
||||
|
||||
public void close() throws IOException {
|
||||
in.close();
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
}
|
||||
};
|
||||
|
||||
return input;
|
||||
}
|
||||
|
||||
public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||
url = message.getURL();
|
||||
final FTPClient ftp = createFTP();
|
||||
|
||||
String path = url.getPath();
|
||||
try {
|
||||
if (!ftp.deleteFile(path)) {
|
||||
throw new JMSException("Delete file failed: " + ftp.getReplyString());
|
||||
}
|
||||
} finally {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
|
||||
/**
|
||||
* A FTP implementation of {@link BlobUploadStrategy}.
|
||||
*/
|
||||
public class FTPBlobUploadStrategy extends FTPStrategy implements BlobUploadStrategy {
|
||||
|
||||
public FTPBlobUploadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
|
||||
super(transferPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
|
||||
try(FileInputStream fis = new FileInputStream(file)) {
|
||||
return uploadStream(message, fis);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
|
||||
throws JMSException, IOException {
|
||||
|
||||
FTPClient ftp = createFTP();
|
||||
try {
|
||||
String path = url.getPath();
|
||||
String workingDir = path.substring(0, path.lastIndexOf("/"));
|
||||
String filename = message.getMessageId().toString().replaceAll(":", "_");
|
||||
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
|
||||
|
||||
String url;
|
||||
if(!ftp.changeWorkingDirectory(workingDir)) {
|
||||
url = this.url.toString().replaceFirst(this.url.getPath(), "")+"/";
|
||||
} else {
|
||||
url = this.url.toString();
|
||||
}
|
||||
|
||||
if (!ftp.storeFile(filename, in)) {
|
||||
throw new JMSException("FTP store failed: " + ftp.getReplyString());
|
||||
}
|
||||
return new URL(url + filename);
|
||||
} finally {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,70 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
|
||||
public class FTPStrategy {
|
||||
|
||||
protected BlobTransferPolicy transferPolicy;
|
||||
protected URL url;
|
||||
protected String ftpUser = "";
|
||||
protected String ftpPass = "";
|
||||
|
||||
public FTPStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
|
||||
this.transferPolicy = transferPolicy;
|
||||
this.url = new URL(this.transferPolicy.getUploadUrl());
|
||||
}
|
||||
|
||||
protected void setUserInformation(String userInfo) {
|
||||
if(userInfo != null) {
|
||||
String[] userPass = userInfo.split(":");
|
||||
if(userPass.length > 0) this.ftpUser = userPass[0];
|
||||
if(userPass.length > 1) this.ftpPass = userPass[1];
|
||||
} else {
|
||||
this.ftpUser = "anonymous";
|
||||
this.ftpPass = "anonymous";
|
||||
}
|
||||
}
|
||||
|
||||
protected FTPClient createFTP() throws IOException, JMSException {
|
||||
String connectUrl = url.getHost();
|
||||
setUserInformation(url.getUserInfo());
|
||||
int port = url.getPort() < 1 ? 21 : url.getPort();
|
||||
|
||||
FTPClient ftp = new FTPClient();
|
||||
try {
|
||||
ftp.connect(connectUrl, port);
|
||||
} catch(ConnectException e) {
|
||||
throw new JMSException("Problem connecting the FTP-server");
|
||||
}
|
||||
if(!ftp.login(ftpUser, ftpPass)) {
|
||||
ftp.quit();
|
||||
ftp.disconnect();
|
||||
throw new JMSException("Cant Authentificate to FTP-Server");
|
||||
}
|
||||
return ftp;
|
||||
}
|
||||
|
||||
}
|
|
@ -69,51 +69,4 @@ public class DownloadStrategyTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFTPBlobDownloadStrategy() throws Exception {
|
||||
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||
transferPolicy.setUploadUrl("ftp://localhost:22");
|
||||
BlobDownloadStrategy downloadStrategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
|
||||
// Test protocol
|
||||
message.setURL(new URL("https://www.apache.org"));
|
||||
try {
|
||||
downloadStrategy.getInputStream(message);
|
||||
fail("Failure expected on an incorrect blob message URL");
|
||||
} catch (IOException ex) {
|
||||
// expected
|
||||
assertEquals("The message URL protocol is incorrect", ex.getMessage());
|
||||
}
|
||||
|
||||
// Test host
|
||||
message.setURL(new URL("ftp://some-ip:22/somedoc"));
|
||||
try {
|
||||
downloadStrategy.getInputStream(message);
|
||||
fail("Failure expected on an incorrect blob message URL");
|
||||
} catch (IOException ex) {
|
||||
// expected
|
||||
assertEquals("The message URL host is incorrect", ex.getMessage());
|
||||
}
|
||||
|
||||
// Test port
|
||||
message.setURL(new URL("ftp://localhost:12345/somedoc"));
|
||||
try {
|
||||
downloadStrategy.getInputStream(message);
|
||||
fail("Failure expected on an incorrect blob message URL");
|
||||
} catch (IOException ex) {
|
||||
// expected
|
||||
assertEquals("The message URL port is incorrect", ex.getMessage());
|
||||
}
|
||||
|
||||
// This is OK (but won't connect)
|
||||
message.setURL(new URL("ftp://localhost:22/somedoc"));
|
||||
try {
|
||||
downloadStrategy.getInputStream(message);
|
||||
fail("Failure expected on connection");
|
||||
} catch (IOException | JMSException ex) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
}
|
|
@ -111,10 +111,6 @@
|
|||
<artifactId>xalan</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-net</groupId>
|
||||
<artifactId>commons-net</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- =============================== -->
|
||||
<!-- Testing Dependencies -->
|
||||
|
|
|
@ -31,7 +31,6 @@
|
|||
<bundle dependency="true">mvn:org.jvnet.jaxb2_commons/jaxb2-basics-runtime/${jaxb-basics-version}</bundle>
|
||||
<bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jaxb-impl/${jaxb-bundle-version}</bundle>
|
||||
<bundle>mvn:org.apache.commons/commons-pool2/${commons-pool2-version}</bundle>
|
||||
<bundle>mvn:commons-net/commons-net/${commons-net-version}</bundle>
|
||||
<!-- uber osgi bundle means client is not that lean, todo: introduce client osgi bundle -->
|
||||
<bundle>mvn:org.apache.activemq/activemq-osgi/${project.version}</bundle>
|
||||
</feature>
|
||||
|
|
|
@ -121,10 +121,6 @@
|
|||
<artifactId>xalan</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-net</groupId>
|
||||
<artifactId>commons-net</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- =============================== -->
|
||||
<!-- Testing Dependencies -->
|
||||
|
|
|
@ -147,10 +147,6 @@
|
|||
<artifactId>xalan</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-net</groupId>
|
||||
<artifactId>commons-net</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- =============================== -->
|
||||
<!-- Testing Dependencies -->
|
||||
|
@ -294,12 +290,6 @@
|
|||
<version>3.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.ftpserver</groupId>
|
||||
<artifactId>ftpserver-core</artifactId>
|
||||
<version>${ftpserver-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-inline</artifactId>
|
||||
|
|
|
@ -1,114 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
||||
public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
|
||||
|
||||
final int FILE_SIZE = Short.MAX_VALUE * 10;
|
||||
|
||||
public void testDownload() throws Exception {
|
||||
setConnection();
|
||||
|
||||
// create file
|
||||
File uploadFile = new File(ftpHomeDirFile, "test.txt");
|
||||
FileWriter wrt = new FileWriter(uploadFile);
|
||||
|
||||
wrt.write("hello world");
|
||||
|
||||
for(int ix = 0; ix < FILE_SIZE; ++ix ) {
|
||||
wrt.write("a");
|
||||
}
|
||||
|
||||
wrt.close();
|
||||
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||
transferPolicy.setUploadUrl(ftpUrl);
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||
InputStream stream;
|
||||
try {
|
||||
message.setURL(new URL(ftpUrl + "test.txt"));
|
||||
stream = strategy.getInputStream(message);
|
||||
int i = stream.read();
|
||||
StringBuilder sb = new StringBuilder(2048);
|
||||
while(i != -1) {
|
||||
sb.append((char)i);
|
||||
i = stream.read();
|
||||
}
|
||||
assertEquals("hello world", sb.toString().substring(0, "hello world".length()));
|
||||
assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length());
|
||||
|
||||
assertTrue(uploadFile.exists());
|
||||
strategy.deleteFile(message);
|
||||
assertFalse(uploadFile.exists());
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
||||
public void testWrongAuthentification() throws Exception {
|
||||
setConnection();
|
||||
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||
transferPolicy.setUploadUrl(ftpUrl);
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||
try {
|
||||
message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/"));
|
||||
strategy.getInputStream(message);
|
||||
} catch(JMSException e) {
|
||||
assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage());
|
||||
return;
|
||||
} catch(Exception e) {
|
||||
System.out.println(e);
|
||||
assertTrue("Wrong Exception "+ e, false);
|
||||
return;
|
||||
}
|
||||
|
||||
assertTrue("Expect Exception", false);
|
||||
}
|
||||
|
||||
public void testWrongFTPPort() throws Exception {
|
||||
setConnection();
|
||||
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||
transferPolicy.setUploadUrl(ftpUrl);
|
||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||
try {
|
||||
message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/"));
|
||||
strategy.getInputStream(message);
|
||||
} catch (IOException e) {
|
||||
assertEquals("Wrong Exception", "The message URL port is incorrect", e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
assertTrue("Expect Exception", false);
|
||||
}
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.InputStream;
|
||||
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.BlobMessage;
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
||||
public class FTPBlobTest extends FTPTestSupport {
|
||||
|
||||
public void testBlobFile() throws Exception {
|
||||
setConnection();
|
||||
// first create Message
|
||||
File file = File.createTempFile("amq-data-file-", ".dat");
|
||||
// lets write some data
|
||||
String content = "hello world " + System.currentTimeMillis();
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
|
||||
writer.append(content);
|
||||
writer.close();
|
||||
|
||||
ActiveMQSession session = (ActiveMQSession) connection.createSession(
|
||||
false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
BlobMessage message = session.createBlobMessage(file);
|
||||
message.setName("fileName");
|
||||
|
||||
producer.send(message);
|
||||
Thread.sleep(1000);
|
||||
|
||||
// check message send
|
||||
Message msg = consumer.receive(1000);
|
||||
assertTrue(msg instanceof ActiveMQBlobMessage);
|
||||
|
||||
assertEquals("name is correct", "fileName", ((ActiveMQBlobMessage)msg).getName());
|
||||
InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
|
||||
StringBuilder b = new StringBuilder();
|
||||
int i = input.read();
|
||||
while (i != -1) {
|
||||
b.append((char) i);
|
||||
i = input.read();
|
||||
}
|
||||
input.close();
|
||||
File uploaded = new File(ftpHomeDirFile, msg.getJMSMessageID().toString().replace(":", "_"));
|
||||
assertEquals(content, b.toString());
|
||||
assertTrue(uploaded.exists());
|
||||
((ActiveMQBlobMessage)msg).deleteFile();
|
||||
assertFalse(uploaded.exists());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
||||
|
||||
public class FTPBlobUploadStrategyTest extends FTPTestSupport {
|
||||
|
||||
public void testFileUpload() throws Exception {
|
||||
setConnection();
|
||||
File file = File.createTempFile("amq-data-file-", ".dat");
|
||||
// lets write some data
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
|
||||
writer.append("hello world");
|
||||
writer.close();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
((ActiveMQConnection)connection).setCopyMessageOnSend(false);
|
||||
|
||||
ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file);
|
||||
message.setJMSMessageID("testmessage");
|
||||
message.onSend();
|
||||
assertEquals(ftpUrl + "ID_testmessage", message.getURL().toString());
|
||||
File uploaded = new File(ftpHomeDirFile, "ID_testmessage");
|
||||
assertTrue("File doesn't exists", uploaded.exists());
|
||||
}
|
||||
|
||||
public void testWriteDenied() throws Exception {
|
||||
userNamePass = "guest";
|
||||
setConnection();
|
||||
File file = File.createTempFile("amq-data-file-", ".dat");
|
||||
// lets write some data
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
|
||||
writer.append("hello world");
|
||||
writer.close();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
((ActiveMQConnection)connection).setCopyMessageOnSend(false);
|
||||
|
||||
ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file);
|
||||
message.setJMSMessageID("testmessage");
|
||||
try {
|
||||
message.onSend();
|
||||
} catch (JMSException e) {
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
fail("Should have failed with permission denied exception!");
|
||||
}
|
||||
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.ftpserver.FtpServer;
|
||||
import org.apache.ftpserver.FtpServerFactory;
|
||||
import org.apache.ftpserver.ftplet.Authority;
|
||||
import org.apache.ftpserver.ftplet.UserManager;
|
||||
import org.apache.ftpserver.listener.ListenerFactory;
|
||||
import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
|
||||
import org.apache.ftpserver.usermanager.impl.BaseUser;
|
||||
import org.apache.ftpserver.usermanager.impl.WritePermission;
|
||||
import org.jmock.Mockery;
|
||||
|
||||
public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
|
||||
|
||||
protected static final String ftpServerListenerName = "default";
|
||||
protected Connection connection;
|
||||
protected FtpServer server;
|
||||
String userNamePass = "activemq";
|
||||
|
||||
Mockery context = null;
|
||||
String ftpUrl;
|
||||
int ftpPort;
|
||||
|
||||
final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest");
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
|
||||
if (ftpHomeDirFile.getParentFile().exists()) {
|
||||
IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
|
||||
}
|
||||
ftpHomeDirFile.mkdirs();
|
||||
ftpHomeDirFile.getParentFile().deleteOnExit();
|
||||
|
||||
FtpServerFactory serverFactory = new FtpServerFactory();
|
||||
ListenerFactory factory = new ListenerFactory();
|
||||
|
||||
PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
|
||||
UserManager userManager = userManagerFactory.createUserManager();
|
||||
|
||||
BaseUser user = new BaseUser();
|
||||
user.setName("activemq");
|
||||
user.setPassword("activemq");
|
||||
user.setHomeDirectory(ftpHomeDirFile.getParent());
|
||||
|
||||
// authorize user
|
||||
List<Authority> auths = new ArrayList<Authority>();
|
||||
Authority auth = new WritePermission();
|
||||
auths.add(auth);
|
||||
user.setAuthorities(auths);
|
||||
|
||||
userManager.save(user);
|
||||
|
||||
BaseUser guest = new BaseUser();
|
||||
guest.setName("guest");
|
||||
guest.setPassword("guest");
|
||||
guest.setHomeDirectory(ftpHomeDirFile.getParent());
|
||||
|
||||
userManager.save(guest);
|
||||
|
||||
serverFactory.setUserManager(userManager);
|
||||
factory.setPort(0);
|
||||
serverFactory.addListener(ftpServerListenerName, factory
|
||||
.createListener());
|
||||
server = serverFactory.createServer();
|
||||
server.start();
|
||||
ftpPort = serverFactory.getListener(ftpServerListenerName)
|
||||
.getPort();
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
public void setConnection() throws Exception {
|
||||
ftpUrl = "ftp://"
|
||||
+ userNamePass
|
||||
+ ":"
|
||||
+ userNamePass
|
||||
+ "@localhost:"
|
||||
+ ftpPort
|
||||
+ "/ftptest/";
|
||||
bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + ftpUrl;
|
||||
|
||||
connectionFactory = createConnectionFactory();
|
||||
|
||||
connection = createConnection();
|
||||
connection.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
super.tearDown();
|
||||
if (server != null) {
|
||||
server.stop();
|
||||
}
|
||||
IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -192,7 +192,6 @@
|
|||
<include>org.apache.commons:commons-dbcp2</include>
|
||||
<include>org.apache.commons:commons-pool2</include>
|
||||
<include>commons-codec:commons-codec</include>
|
||||
<include>commons-net:commons-net</include>
|
||||
<include>org.apache.commons:commons-lang3</include>
|
||||
<include>org.slf4j:slf4j-log4j12</include>
|
||||
<include>log4j:log4j</include>
|
||||
|
|
8
pom.xml
8
pom.xml
|
@ -54,7 +54,6 @@
|
|||
<commons-logging-version>1.2</commons-logging-version>
|
||||
<commons-pool2-version>2.11.1</commons-pool2-version>
|
||||
<commons-primitives-version>1.0</commons-primitives-version>
|
||||
<commons-net-version>3.8.0</commons-net-version>
|
||||
<directory-version>2.0.0.AM25</directory-version>
|
||||
<ecj.version>3.17.0</ecj.version>
|
||||
<ftpserver-version>1.1.1</ftpserver-version>
|
||||
|
@ -998,13 +997,6 @@
|
|||
<version>3.1.4</version>
|
||||
</dependency>
|
||||
|
||||
<!-- FTP support for BlobMessages -->
|
||||
<dependency>
|
||||
<groupId>commons-net</groupId>
|
||||
<artifactId>commons-net</artifactId>
|
||||
<version>${commons-net-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.velocity</groupId>
|
||||
<artifactId>velocity-engine-core</artifactId>
|
||||
|
|
Loading…
Reference in New Issue