diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java index 6e7e3998d0..beab1d0a35 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java @@ -29,6 +29,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; @@ -87,10 +88,12 @@ public class CommandChannel implements Service { bufferPool.stop(); } - public Command read() throws IOException { + public void read(CommandProcessor processor) throws IOException { + Command answer = null; + SocketAddress address = null; synchronized (readLock) { readBuffer.clear(); - SocketAddress address = channel.receive(readBuffer); + address = channel.receive(readBuffer); readBuffer.flip(); if (log.isDebugEnabled()) { @@ -122,11 +125,18 @@ public class CommandChannel implements Service { header.setCommand(command); } - return readStack.read(header); + answer = readStack.read(header); + } + if (answer != null) { + processor.process(answer, address); } } public void write(Command command) throws IOException { + write(command, targetAddress); + } + + public void write(Command command, SocketAddress address) throws IOException { synchronized (writeLock) { header.incrementCounter(); int size = wireFormat.tightMarshal1(command, bs); @@ -146,7 +156,7 @@ public class CommandChannel implements Service { byte[] data = buffer.toByteArray(); writeBuffer.put(data); - sendWriteBuffer(); + sendWriteBuffer(address); } else { header.setPartial(true); @@ -171,15 +181,15 @@ public class CommandChannel implements Service { // now the data writeBuffer.put(data, offset, chunkSize); offset += chunkSize; - sendWriteBuffer(); + sendWriteBuffer(address); } } } } - protected void sendWriteBuffer() throws IOException { + protected void sendWriteBuffer(SocketAddress address) throws IOException { writeBuffer.flip(); - channel.send(writeBuffer, targetAddress); + channel.send(writeBuffer, address); } // Properties @@ -215,4 +225,5 @@ public class CommandChannel implements Service { this.headerMarshaller = headerMarshaller; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java new file mode 100644 index 0000000000..4350d8cbd3 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java @@ -0,0 +1,32 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.apache.activemq.command.Command; + +import java.net.SocketAddress; + +/** + * A callback used to process inbound commands + * + * @version $Revision$ + */ +public interface CommandProcessor { + + void process(Command command, SocketAddress address); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index cb476b9530..3a9ed544aa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -28,12 +28,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; -import java.io.InterruptedIOException; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.SocketTimeoutException; import java.net.URI; import java.net.UnknownHostException; import java.nio.channels.AsynchronousCloseException; @@ -58,6 +56,10 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S private boolean trace = false; private boolean useLocalHost = true; private int port; + private CommandProcessor commandProcessor = new CommandProcessor() { + public void process(Command command, SocketAddress address) { + doConsume(command); + }}; protected UdpTransport(OpenWireFormat wireFormat) throws IOException { this.wireFormat = wireFormat; @@ -72,16 +74,23 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S this(wireFormat); this.targetAddress = socketAddress; } - + /** * A one way asynchronous send */ public void oneway(Command command) throws IOException { + oneway(command, targetAddress); + } + + /** + * A one way asynchronous send to a given address + */ + public void oneway(Command command, InetSocketAddress address) throws IOException { if (log.isDebugEnabled()) { log.debug("Sending oneway from port: " + port + " to target: " + targetAddress); } checkStarted(command); - commandChannel.write(command); + commandChannel.write(command, address); } /** @@ -96,10 +105,9 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S */ public void run() { log.trace("Consumer thread starting for: " + toString()); - while (!isClosed()) { + while (!isStopped()) { try { - Command command = commandChannel.read(); - doConsume(command); + commandChannel.read(commandProcessor); } /* * catch (SocketTimeoutException e) { } catch @@ -209,7 +217,14 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S // Implementation methods // ------------------------------------------------------------------------- + protected CommandProcessor getCommandProcessor() { + return commandProcessor; + } + protected void setCommandProcessor(CommandProcessor commandProcessor) { + this.commandProcessor = commandProcessor; + } + /** * Creates an address from the given URI */ @@ -257,4 +272,5 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S channel.close(); } } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java index 9b6d9c5f8d..9b14279734 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java @@ -16,57 +16,46 @@ */ package org.apache.activemq.transport.udp; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; -import javax.net.ServerSocketFactory; -import javax.net.SocketFactory; import org.activeio.command.WireFormat; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.InactivityMonitor; -import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.URISupport; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.Map; public class UdpTransportFactory extends TransportFactory { - private static final Log log = LogFactory.getLog(UdpTransportFactory.class); public TransportServer doBind(String brokerId, final URI location) throws IOException { - throw new IOException("TransportServer not supported for UDP"); - /* try { - Map options = new HashMap(URISupport.parseParamters(location)); - - return null; - UdpTransportServer server = new UdpTransportServer(location); - server.setWireFormatFactory(createWireFormatFactory(options)); - IntrospectionSupport.setProperties(server, options); - + UdpTransport transport = (UdpTransport) doConnect(location); + UdpTransportServer server = new UdpTransportServer(transport); return server; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } - */ + catch (Exception e) { + throw IOExceptionSupport.create(e); + } } public Transport configure(Transport transport, WireFormat format, Map options) { IntrospectionSupport.setProperties(transport, options); UdpTransport tcpTransport = (UdpTransport) transport; - - /* + if (tcpTransport.isTrace()) { transport = new TransportLogger(transport); } @@ -76,7 +65,6 @@ public class UdpTransportFactory extends TransportFactory { } transport = new ResponseCorrelator(transport); - */ return transport; } @@ -94,25 +82,6 @@ public class UdpTransportFactory extends TransportFactory { } protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { - /* - URI localLocation = null; - String path = location.getPath(); - // see if the path is a local URI location - if (path != null && path.length() > 0) { - int localPortIndex = path.indexOf(':'); - try { - Integer.parseInt(path.substring((localPortIndex + 1), path.length())); - String localString = location.getScheme() + ":/" + path; - localLocation = new URI(localString); - } - catch (Exception e) { - log.warn("path isn't a valid local location for TcpTransport to use", e); - } - } - if (localLocation != null) { - return new UdpTransport(wf, location, localLocation); - } - */ OpenWireFormat wireFormat = (OpenWireFormat) wf; wireFormat.setPrefixPacketSize(false); return new UdpTransport(wireFormat, location); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java new file mode 100755 index 0000000000..2155fc2827 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java @@ -0,0 +1,119 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.InactivityMonitor; +import org.apache.activemq.transport.ResponseCorrelator; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.TransportServerSupport; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; + +/** + * A UDP based implementation of {@link TransportServer} + * + * @version $Revision$ + */ + +public class UdpTransportServer extends TransportServerSupport { + private static final Log log = LogFactory.getLog(UdpTransportServer.class); + + private UdpTransport serverTransport; + private Map transports = new HashMap(); + + public UdpTransportServer(UdpTransport serverTransport) { + this.serverTransport = serverTransport; + } + + public String toString() { + return "UdpTransportServer@" + serverTransport; + } + + public void run() { + } + + public UdpTransport getServerTransport() { + return serverTransport; + } + + public void setBrokerInfo(BrokerInfo brokerInfo) { + } + + protected void doStart() throws Exception { + serverTransport.start(); + serverTransport.setCommandProcessor(new CommandProcessor() { + public void process(Command command, SocketAddress address) { + onInboundCommand(command, address); + } + }); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + serverTransport.stop(); + } + + protected void onInboundCommand(Command command, SocketAddress address) { + Transport transport = null; + synchronized (transports) { + transport = (Transport) transports.get(address); + if (transport == null) { + transport = createTransport(address); + transport = configureTransport(transport); + transports.put(address, transport); + } + } + processInboundCommand(command, transport); + } + + public void sendOutboundCommand(Command command, SocketAddress address) { + // TODO we should use an inbound buffer to make this async + + } + + protected void processInboundCommand(Command command, Transport transport) { + // TODO - consider making this asynchronous + TransportListener listener = transport.getTransportListener(); + if (listener != null) { + listener.onCommand(command); + } + else { + log.error("No transportListener available for transport: " + transport + " to process inbound command: " + command); + } + } + + protected Transport configureTransport(Transport transport) { + transport = new ResponseCorrelator(transport); + transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); + getAcceptListener().onAccept(transport); + return transport; + } + + protected TransportSupport createTransport(SocketAddress address) { + return new UdpTransportServerClient(this, address); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java new file mode 100755 index 0000000000..922fc9653c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java @@ -0,0 +1,71 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * A logical server side transport instance for a remote client which works with + * the {@link UdpTransportServer} + * + * @version $Revision$ + */ +public class UdpTransportServerClient extends TransportSupport { + private static final Log log = LogFactory.getLog(UdpTransportServerClient.class); + + private UdpTransportServer server; + private SocketAddress address; + private List queue = Collections.synchronizedList(new LinkedList()); + + public UdpTransportServerClient(UdpTransportServer server, SocketAddress address) { + this.server = server; + this.address = address; + } + + public String toString() { + return "UdpClient@" + address; + } + + public void oneway(Command command) throws IOException { + checkStarted(command); + server.sendOutboundCommand(command, address); + } + + protected void doStart() throws Exception { + for (Iterator iter = queue.iterator(); iter.hasNext();) { + Command command = (Command) iter.next(); + doConsume(command); + iter.remove(); + } + } + + protected void doStop(ServiceStopper stopper) throws Exception { + queue.clear(); + } + +}