mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2239 - stomp+nio transport implementation
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@813703 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5c405cbfd8
commit
d4133c4de8
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channel;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
/**
|
||||
* Implementation of InputStream using Java NIO channel,direct buffer and
|
||||
* Selector
|
||||
*/
|
||||
public class NIOBufferedInputStream extends InputStream {
|
||||
|
||||
private final static int BUFFER_SIZE = 8192;
|
||||
|
||||
private SocketChannel sc = null;
|
||||
|
||||
private ByteBuffer bb = null;
|
||||
|
||||
private Selector rs = null;
|
||||
|
||||
public NIOBufferedInputStream(ReadableByteChannel channel, int size)
|
||||
throws ClosedChannelException, IOException {
|
||||
|
||||
if (size <= 0) {
|
||||
throw new IllegalArgumentException("Buffer size <= 0");
|
||||
}
|
||||
|
||||
this.bb = ByteBuffer.allocateDirect(size);
|
||||
this.sc = (SocketChannel) channel;
|
||||
|
||||
this.sc.configureBlocking(false);
|
||||
|
||||
this.rs = Selector.open();
|
||||
|
||||
sc.register(rs, SelectionKey.OP_READ);
|
||||
|
||||
bb.position(0);
|
||||
bb.limit(0);
|
||||
}
|
||||
|
||||
public NIOBufferedInputStream(ReadableByteChannel channel)
|
||||
throws ClosedChannelException, IOException {
|
||||
this(channel, BUFFER_SIZE);
|
||||
}
|
||||
|
||||
public int available() throws IOException {
|
||||
if (!rs.isOpen())
|
||||
throw new IOException("Input Stream Closed");
|
||||
|
||||
return bb.remaining();
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if (rs.isOpen()) {
|
||||
rs.close();
|
||||
|
||||
if (sc.isOpen()) {
|
||||
sc.socket().shutdownInput();
|
||||
sc.socket().close();
|
||||
}
|
||||
|
||||
bb = null;
|
||||
sc = null;
|
||||
}
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
if (!rs.isOpen())
|
||||
throw new IOException("Input Stream Closed");
|
||||
|
||||
if (!bb.hasRemaining()) {
|
||||
try {
|
||||
fill(1);
|
||||
} catch (ClosedChannelException e) {
|
||||
close();
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return (bb.get() & 0xFF);
|
||||
}
|
||||
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
int bytesCopied = -1;
|
||||
|
||||
if (!rs.isOpen())
|
||||
throw new IOException("Input Stream Closed");
|
||||
|
||||
while (bytesCopied == -1) {
|
||||
if (bb.hasRemaining()) {
|
||||
bytesCopied = (len < bb.remaining() ? len : bb.remaining());
|
||||
bb.get(b, off, bytesCopied);
|
||||
} else {
|
||||
try {
|
||||
fill(1);
|
||||
} catch (ClosedChannelException e) {
|
||||
close();
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return bytesCopied;
|
||||
}
|
||||
|
||||
public long skip(long n) throws IOException {
|
||||
long skiped = 0;
|
||||
|
||||
if (!rs.isOpen())
|
||||
throw new IOException("Input Stream Closed");
|
||||
|
||||
while (n > 0) {
|
||||
if (n <= bb.remaining()) {
|
||||
skiped += n;
|
||||
bb.position(bb.position() + (int) n);
|
||||
n = 0;
|
||||
} else {
|
||||
skiped += bb.remaining();
|
||||
n -= bb.remaining();
|
||||
|
||||
bb.position(bb.limit());
|
||||
|
||||
try {
|
||||
fill((int) n);
|
||||
} catch (ClosedChannelException e) {
|
||||
close();
|
||||
return skiped;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return skiped;
|
||||
}
|
||||
|
||||
private void fill(int n) throws IOException, ClosedChannelException {
|
||||
int bytesRead = -1;
|
||||
|
||||
if ((n <= 0) || (n <= bb.remaining()))
|
||||
return;
|
||||
|
||||
bb.compact();
|
||||
|
||||
n = (bb.remaining() < n ? bb.remaining() : n);
|
||||
|
||||
for (;;) {
|
||||
bytesRead = sc.read(bb);
|
||||
|
||||
if (bytesRead == -1)
|
||||
throw new ClosedChannelException();
|
||||
|
||||
n -= bytesRead;
|
||||
|
||||
if (n <= 0)
|
||||
break;
|
||||
|
||||
rs.select(0);
|
||||
rs.selectedKeys().clear();
|
||||
}
|
||||
|
||||
bb.flip();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.nio.NIOBufferedInputStream;
|
||||
import org.apache.activemq.transport.nio.NIOOutputStream;
|
||||
import org.apache.activemq.transport.tcp.TcpTransport;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
|
||||
/**
|
||||
* An implementation of the {@link Transport} interface for using Stomp over NIO
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class StompNIOTransport extends TcpTransport {
|
||||
|
||||
private SocketChannel channel;
|
||||
|
||||
public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
|
||||
super(wireFormat, socketFactory, remoteLocation, localLocation);
|
||||
}
|
||||
|
||||
public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
|
||||
super(wireFormat, socket);
|
||||
}
|
||||
|
||||
protected void initializeStreams() throws IOException {
|
||||
channel = socket.getChannel();
|
||||
channel.configureBlocking(false);
|
||||
|
||||
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
|
||||
this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.net.ServerSocketFactory;
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerServiceAware;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.nio.NIOTransport;
|
||||
import org.apache.activemq.transport.nio.NIOTransportFactory;
|
||||
import org.apache.activemq.transport.tcp.TcpTransport;
|
||||
import org.apache.activemq.transport.tcp.TcpTransportServer;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.apache.activemq.xbean.XBeanBrokerService;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
/**
|
||||
* A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
|
||||
*
|
||||
* @version $Revision: 645574 $
|
||||
*/
|
||||
public class StompNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
|
||||
|
||||
private ApplicationContext applicationContext = null;
|
||||
|
||||
protected String getDefaultWireFormatType() {
|
||||
return "stomp";
|
||||
}
|
||||
|
||||
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
|
||||
return new TcpTransportServer(this, location, serverSocketFactory) {
|
||||
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
|
||||
return new StompNIOTransport(format, socket);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
|
||||
return new StompNIOTransport(wf, socketFactory, location, localLocation);
|
||||
}
|
||||
|
||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||
transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext);
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
return super.compositeConfigure(transport, format, options);
|
||||
}
|
||||
|
||||
protected boolean isUseInactivityMonitor(Transport transport) {
|
||||
// lets disable the inactivity monitor as stomp does not use keep alive
|
||||
// packets
|
||||
return false;
|
||||
}
|
||||
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
if (brokerService instanceof XBeanBrokerService) {
|
||||
this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
## ---------------------------------------------------------------------------
|
||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
## contributor license agreements. See the NOTICE file distributed with
|
||||
## this work for additional information regarding copyright ownership.
|
||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
## (the "License"); you may not use this file except in compliance with
|
||||
## the License. You may obtain a copy of the License at
|
||||
##
|
||||
## http://www.apache.org/licenses/LICENSE-2.0
|
||||
##
|
||||
## Unless required by applicable law or agreed to in writing, software
|
||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
## See the License for the specific language governing permissions and
|
||||
## limitations under the License.
|
||||
## ---------------------------------------------------------------------------
|
||||
class=org.apache.activemq.transport.stomp.StompNIOTransportFactory
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
|
||||
/**
|
||||
* @version $Revision: 732672 $
|
||||
*/
|
||||
public class StompNIOTest extends StompTest {
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
bindAddress = "stomp+nio://localhost:61612";
|
||||
confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
|
||||
super.setUp();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- this file can only be parsed using the xbean-spring library -->
|
||||
<!-- START SNIPPET: example -->
|
||||
<beans>
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
|
||||
<property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
|
||||
</bean>
|
||||
|
||||
<broker useJmx="true" persistent="false" xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector name="stomp+nio" uri="stomp+nio://localhost:61612"/>
|
||||
</transportConnectors>
|
||||
|
||||
<plugins>
|
||||
<simpleAuthenticationPlugin>
|
||||
<users>
|
||||
<authenticationUser username="system" password="manager"
|
||||
groups="users,admins"/>
|
||||
<authenticationUser username="user" password="password"
|
||||
groups="users"/>
|
||||
<authenticationUser username="guest" password="password" groups="guests"/>
|
||||
</users>
|
||||
</simpleAuthenticationPlugin>
|
||||
|
||||
|
||||
<!-- lets configure a destination based authorization mechanism -->
|
||||
<authorizationPlugin>
|
||||
<map>
|
||||
<authorizationMap>
|
||||
<authorizationEntries>
|
||||
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
|
||||
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
|
||||
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
|
||||
|
||||
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
|
||||
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
|
||||
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
|
||||
|
||||
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
|
||||
</authorizationEntries>
|
||||
</authorizationMap>
|
||||
</map>
|
||||
</authorizationPlugin>
|
||||
</plugins>
|
||||
</broker>
|
||||
|
||||
</beans>
|
Loading…
Reference in New Issue