mirror of https://github.com/apache/activemq.git
AMQ-7450 - Put some restrictions on the URLs that are allowed in BlobMessages
This commit is contained in:
parent
742feba11e
commit
45108a2328
|
@ -38,6 +38,24 @@ public class DefaultBlobDownloadStrategy extends DefaultStrategy implements Blob
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Do some checks on the received URL against the transfer policy
|
||||||
|
URL uploadURL = new URL(super.transferPolicy.getUploadUrl());
|
||||||
|
String protocol = message.getURL().getProtocol();
|
||||||
|
if (!protocol.equals(uploadURL.getProtocol())) {
|
||||||
|
throw new IOException("The message URL protocol is incorrect");
|
||||||
|
}
|
||||||
|
|
||||||
|
String host = message.getURL().getHost();
|
||||||
|
if (!host.equals(uploadURL.getHost())) {
|
||||||
|
throw new IOException("The message URL host is incorrect");
|
||||||
|
}
|
||||||
|
|
||||||
|
int port = message.getURL().getPort();
|
||||||
|
if (uploadURL.getPort() != 0 && port != uploadURL.getPort()) {
|
||||||
|
throw new IOException("The message URL port is incorrect");
|
||||||
|
}
|
||||||
|
|
||||||
return value.openStream();
|
return value.openStream();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.io.FilterInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
@ -36,6 +37,23 @@ public class FTPBlobDownloadStrategy extends FTPStrategy implements BlobDownload
|
||||||
}
|
}
|
||||||
|
|
||||||
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
|
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
|
||||||
|
// Do some checks on the received URL against the transfer policy
|
||||||
|
URL uploadURL = new URL(super.transferPolicy.getUploadUrl());
|
||||||
|
String protocol = message.getURL().getProtocol();
|
||||||
|
if (!protocol.equals(uploadURL.getProtocol())) {
|
||||||
|
throw new IOException("The message URL protocol is incorrect");
|
||||||
|
}
|
||||||
|
|
||||||
|
String host = message.getURL().getHost();
|
||||||
|
if (!host.equals(uploadURL.getHost())) {
|
||||||
|
throw new IOException("The message URL host is incorrect");
|
||||||
|
}
|
||||||
|
|
||||||
|
int port = message.getURL().getPort();
|
||||||
|
if (uploadURL.getPort() != 0 && port != uploadURL.getPort()) {
|
||||||
|
throw new IOException("The message URL port is incorrect");
|
||||||
|
}
|
||||||
|
|
||||||
url = message.getURL();
|
url = message.getURL();
|
||||||
final FTPClient ftp = createFTP();
|
final FTPClient ftp = createFTP();
|
||||||
String path = url.getPath();
|
String path = url.getPath();
|
||||||
|
|
|
@ -115,13 +115,19 @@ public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadS
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException {
|
protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException {
|
||||||
if (message.getURL() != null) {
|
if (message.getURL() != null) {
|
||||||
try {
|
// Do some checks on the received URL protocol
|
||||||
return new File(message.getURL().toURI());
|
String protocol = message.getURL().getProtocol();
|
||||||
} catch (URISyntaxException e) {
|
if (!"file".contentEquals(protocol)) {
|
||||||
IOException ioe = new IOException("Unable to open file for message " + message);
|
throw new IOException("The message URL protocol is incorrect");
|
||||||
ioe.initCause(e);
|
}
|
||||||
}
|
|
||||||
|
try {
|
||||||
|
return new File(message.getURL().toURI());
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
IOException ioe = new IOException("Unable to open file for message " + message);
|
||||||
|
ioe.initCause(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//replace all : with _ to make windows more happy
|
//replace all : with _ to make windows more happy
|
||||||
String fileName = message.getJMSMessageID().replaceAll(":", "_");
|
String fileName = message.getJMSMessageID().replaceAll(":", "_");
|
||||||
|
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.blob;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class DownloadStrategyTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultBlobDownloadStrategy() throws Exception {
|
||||||
|
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||||
|
BlobDownloadStrategy downloadStrategy = new DefaultBlobDownloadStrategy(transferPolicy);
|
||||||
|
|
||||||
|
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||||
|
message.setURL(new URL("https://www.apache.org"));
|
||||||
|
|
||||||
|
try {
|
||||||
|
downloadStrategy.getInputStream(message);
|
||||||
|
fail("Failure expected on an incorrect blob message URL");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now allow it
|
||||||
|
transferPolicy.setUploadUrl("https://www.apache.org");
|
||||||
|
downloadStrategy.getInputStream(message).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileBlobDownloadStrategy() throws Exception {
|
||||||
|
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||||
|
transferPolicy.setUploadUrl("file:/tmp/xyz");
|
||||||
|
BlobDownloadStrategy downloadStrategy = new FileSystemBlobStrategy(transferPolicy);
|
||||||
|
|
||||||
|
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||||
|
|
||||||
|
// Test protocol
|
||||||
|
message.setURL(new URL("https://www.apache.org"));
|
||||||
|
try {
|
||||||
|
downloadStrategy.getInputStream(message);
|
||||||
|
fail("Failure expected on an incorrect blob message URL");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// expected
|
||||||
|
assertEquals("The message URL protocol is incorrect", ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFTPBlobDownloadStrategy() throws Exception {
|
||||||
|
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||||
|
transferPolicy.setUploadUrl("ftp://localhost:22");
|
||||||
|
BlobDownloadStrategy downloadStrategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||||
|
|
||||||
|
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||||
|
|
||||||
|
// Test protocol
|
||||||
|
message.setURL(new URL("https://www.apache.org"));
|
||||||
|
try {
|
||||||
|
downloadStrategy.getInputStream(message);
|
||||||
|
fail("Failure expected on an incorrect blob message URL");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// expected
|
||||||
|
assertEquals("The message URL protocol is incorrect", ex.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test host
|
||||||
|
message.setURL(new URL("ftp://some-ip:22/somedoc"));
|
||||||
|
try {
|
||||||
|
downloadStrategy.getInputStream(message);
|
||||||
|
fail("Failure expected on an incorrect blob message URL");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// expected
|
||||||
|
assertEquals("The message URL host is incorrect", ex.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test port
|
||||||
|
message.setURL(new URL("ftp://localhost:12345/somedoc"));
|
||||||
|
try {
|
||||||
|
downloadStrategy.getInputStream(message);
|
||||||
|
fail("Failure expected on an incorrect blob message URL");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// expected
|
||||||
|
assertEquals("The message URL port is incorrect", ex.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is OK (but won't connect)
|
||||||
|
message.setURL(new URL("ftp://localhost:22/somedoc"));
|
||||||
|
try {
|
||||||
|
downloadStrategy.getInputStream(message);
|
||||||
|
fail("Failure expected on connection");
|
||||||
|
} catch (IOException | JMSException ex) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,8 +18,8 @@ package org.apache.activemq.blob;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
@ -46,7 +46,9 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
|
||||||
wrt.close();
|
wrt.close();
|
||||||
|
|
||||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
|
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||||
|
transferPolicy.setUploadUrl(ftpUrl);
|
||||||
|
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||||
InputStream stream;
|
InputStream stream;
|
||||||
try {
|
try {
|
||||||
message.setURL(new URL(ftpUrl + "test.txt"));
|
message.setURL(new URL(ftpUrl + "test.txt"));
|
||||||
|
@ -70,9 +72,13 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWrongAuthentification() throws MalformedURLException {
|
public void testWrongAuthentification() throws Exception {
|
||||||
|
setConnection();
|
||||||
|
|
||||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
|
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||||
|
transferPolicy.setUploadUrl(ftpUrl);
|
||||||
|
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||||
try {
|
try {
|
||||||
message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/"));
|
message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/"));
|
||||||
strategy.getInputStream(message);
|
strategy.getInputStream(message);
|
||||||
|
@ -88,18 +94,18 @@ public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
|
||||||
assertTrue("Expect Exception", false);
|
assertTrue("Expect Exception", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWrongFTPPort() throws MalformedURLException {
|
public void testWrongFTPPort() throws Exception {
|
||||||
|
setConnection();
|
||||||
|
|
||||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||||
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
|
BlobTransferPolicy transferPolicy = new BlobTransferPolicy();
|
||||||
|
transferPolicy.setUploadUrl(ftpUrl);
|
||||||
|
BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(transferPolicy);
|
||||||
try {
|
try {
|
||||||
message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/"));
|
message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/"));
|
||||||
strategy.getInputStream(message);
|
strategy.getInputStream(message);
|
||||||
} catch(JMSException e) {
|
} catch (IOException e) {
|
||||||
assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage());
|
assertEquals("Wrong Exception", "The message URL port is incorrect", e.getMessage());
|
||||||
return;
|
|
||||||
} catch(Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
assertTrue("Wrong Exception "+ e, false);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue