added spike of a UDP server together with applying the transport refactorings to the UDP transport

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384225 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-08 14:44:35 +00:00
parent 11c37a7be5
commit 08f4639d5e
6 changed files with 278 additions and 60 deletions

View File

@ -29,6 +29,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
@ -87,10 +88,12 @@ public class CommandChannel implements Service {
bufferPool.stop();
}
public Command read() throws IOException {
public void read(CommandProcessor processor) throws IOException {
Command answer = null;
SocketAddress address = null;
synchronized (readLock) {
readBuffer.clear();
SocketAddress address = channel.receive(readBuffer);
address = channel.receive(readBuffer);
readBuffer.flip();
if (log.isDebugEnabled()) {
@ -122,11 +125,18 @@ public class CommandChannel implements Service {
header.setCommand(command);
}
return readStack.read(header);
answer = readStack.read(header);
}
if (answer != null) {
processor.process(answer, address);
}
}
public void write(Command command) throws IOException {
write(command, targetAddress);
}
public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) {
header.incrementCounter();
int size = wireFormat.tightMarshal1(command, bs);
@ -146,7 +156,7 @@ public class CommandChannel implements Service {
byte[] data = buffer.toByteArray();
writeBuffer.put(data);
sendWriteBuffer();
sendWriteBuffer(address);
}
else {
header.setPartial(true);
@ -171,15 +181,15 @@ public class CommandChannel implements Service {
// now the data
writeBuffer.put(data, offset, chunkSize);
offset += chunkSize;
sendWriteBuffer();
sendWriteBuffer(address);
}
}
}
}
protected void sendWriteBuffer() throws IOException {
protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip();
channel.send(writeBuffer, targetAddress);
channel.send(writeBuffer, address);
}
// Properties
@ -215,4 +225,5 @@ public class CommandChannel implements Service {
this.headerMarshaller = headerMarshaller;
}
}

View File

@ -0,0 +1,32 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command;
import java.net.SocketAddress;
/**
* A callback used to process inbound commands
*
* @version $Revision$
*/
public interface CommandProcessor {
void process(Command command, SocketAddress address);
}

View File

@ -28,12 +28,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.AsynchronousCloseException;
@ -58,6 +56,10 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private boolean trace = false;
private boolean useLocalHost = true;
private int port;
private CommandProcessor commandProcessor = new CommandProcessor() {
public void process(Command command, SocketAddress address) {
doConsume(command);
}};
protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat;
@ -72,16 +74,23 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this(wireFormat);
this.targetAddress = socketAddress;
}
/**
* A one way asynchronous send
*/
public void oneway(Command command) throws IOException {
oneway(command, targetAddress);
}
/**
* A one way asynchronous send to a given address
*/
public void oneway(Command command, InetSocketAddress address) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Sending oneway from port: " + port + " to target: " + targetAddress);
}
checkStarted(command);
commandChannel.write(command);
commandChannel.write(command, address);
}
/**
@ -96,10 +105,9 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
*/
public void run() {
log.trace("Consumer thread starting for: " + toString());
while (!isClosed()) {
while (!isStopped()) {
try {
Command command = commandChannel.read();
doConsume(command);
commandChannel.read(commandProcessor);
}
/*
* catch (SocketTimeoutException e) { } catch
@ -209,7 +217,14 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
// Implementation methods
// -------------------------------------------------------------------------
protected CommandProcessor getCommandProcessor() {
return commandProcessor;
}
protected void setCommandProcessor(CommandProcessor commandProcessor) {
this.commandProcessor = commandProcessor;
}
/**
* Creates an address from the given URI
*/
@ -257,4 +272,5 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
channel.close();
}
}
}

View File

@ -16,57 +16,46 @@
*/
package org.apache.activemq.transport.udp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
public class UdpTransportFactory extends TransportFactory {
private static final Log log = LogFactory.getLog(UdpTransportFactory.class);
public TransportServer doBind(String brokerId, final URI location) throws IOException {
throw new IOException("TransportServer not supported for UDP");
/*
try {
Map options = new HashMap(URISupport.parseParamters(location));
return null;
UdpTransportServer server = new UdpTransportServer(location);
server.setWireFormatFactory(createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
UdpTransport transport = (UdpTransport) doConnect(location);
UdpTransportServer server = new UdpTransportServer(transport);
return server;
}
catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
*/
catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
public Transport configure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
UdpTransport tcpTransport = (UdpTransport) transport;
/*
if (tcpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
@ -76,7 +65,6 @@ public class UdpTransportFactory extends TransportFactory {
}
transport = new ResponseCorrelator(transport);
*/
return transport;
}
@ -94,25 +82,6 @@ public class UdpTransportFactory extends TransportFactory {
}
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
/*
URI localLocation = null;
String path = location.getPath();
// see if the path is a local URI location
if (path != null && path.length() > 0) {
int localPortIndex = path.indexOf(':');
try {
Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
String localString = location.getScheme() + ":/" + path;
localLocation = new URI(localString);
}
catch (Exception e) {
log.warn("path isn't a valid local location for TcpTransport to use", e);
}
}
if (localLocation != null) {
return new UdpTransport(wf, location, localLocation);
}
*/
OpenWireFormat wireFormat = (OpenWireFormat) wf;
wireFormat.setPrefixPacketSize(false);
return new UdpTransport(wireFormat, location);

View File

@ -0,0 +1,119 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerSupport;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* A UDP based implementation of {@link TransportServer}
*
* @version $Revision$
*/
public class UdpTransportServer extends TransportServerSupport {
private static final Log log = LogFactory.getLog(UdpTransportServer.class);
private UdpTransport serverTransport;
private Map transports = new HashMap();
public UdpTransportServer(UdpTransport serverTransport) {
this.serverTransport = serverTransport;
}
public String toString() {
return "UdpTransportServer@" + serverTransport;
}
public void run() {
}
public UdpTransport getServerTransport() {
return serverTransport;
}
public void setBrokerInfo(BrokerInfo brokerInfo) {
}
protected void doStart() throws Exception {
serverTransport.start();
serverTransport.setCommandProcessor(new CommandProcessor() {
public void process(Command command, SocketAddress address) {
onInboundCommand(command, address);
}
});
}
protected void doStop(ServiceStopper stopper) throws Exception {
serverTransport.stop();
}
protected void onInboundCommand(Command command, SocketAddress address) {
Transport transport = null;
synchronized (transports) {
transport = (Transport) transports.get(address);
if (transport == null) {
transport = createTransport(address);
transport = configureTransport(transport);
transports.put(address, transport);
}
}
processInboundCommand(command, transport);
}
public void sendOutboundCommand(Command command, SocketAddress address) {
// TODO we should use an inbound buffer to make this async
}
protected void processInboundCommand(Command command, Transport transport) {
// TODO - consider making this asynchronous
TransportListener listener = transport.getTransportListener();
if (listener != null) {
listener.onCommand(command);
}
else {
log.error("No transportListener available for transport: " + transport + " to process inbound command: " + command);
}
}
protected Transport configureTransport(Transport transport) {
transport = new ResponseCorrelator(transport);
transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
getAcceptListener().onAccept(transport);
return transport;
}
protected TransportSupport createTransport(SocketAddress address) {
return new UdpTransportServerClient(this, address);
}
}

View File

@ -0,0 +1,71 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* A logical server side transport instance for a remote client which works with
* the {@link UdpTransportServer}
*
* @version $Revision$
*/
public class UdpTransportServerClient extends TransportSupport {
private static final Log log = LogFactory.getLog(UdpTransportServerClient.class);
private UdpTransportServer server;
private SocketAddress address;
private List queue = Collections.synchronizedList(new LinkedList());
public UdpTransportServerClient(UdpTransportServer server, SocketAddress address) {
this.server = server;
this.address = address;
}
public String toString() {
return "UdpClient@" + address;
}
public void oneway(Command command) throws IOException {
checkStarted(command);
server.sendOutboundCommand(command, address);
}
protected void doStart() throws Exception {
for (Iterator iter = queue.iterator(); iter.hasNext();) {
Command command = (Command) iter.next();
doConsume(command);
iter.remove();
}
}
protected void doStop(ServiceStopper stopper) throws Exception {
queue.clear();
}
}