mirror of https://github.com/apache/activemq.git
initial spike of a UDP based transport - completely untested so far :)
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383893 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fb2902900a
commit
eba4c9cfa8
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Represents a pool of {@link ByteBuffer} instances.
|
||||
* This strategy could just create new buffers for each call or
|
||||
* it could pool them.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public interface ByteBufferPool extends Service {
|
||||
|
||||
/**
|
||||
* Extract a buffer from the pool.
|
||||
*/
|
||||
ByteBuffer borrowBuffer();
|
||||
|
||||
/**
|
||||
* Returns the buffer to the pool or just discards it for a non-pool strategy
|
||||
*/
|
||||
void returnBuffer(ByteBuffer buffer);
|
||||
|
||||
/**
|
||||
* Sets the default size of the buffers
|
||||
*/
|
||||
void setDefaultSize(int defaultSize);
|
||||
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.openwire.BooleanStream;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.channels.Channels;
|
||||
|
||||
/**
|
||||
* A strategy for reading datagrams and de-fragmenting them together.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class CommandChannel implements Service {
|
||||
|
||||
private static final Log log = LogFactory.getLog(CommandChannel.class);
|
||||
|
||||
private ByteChannel channel;
|
||||
private OpenWireFormat wireFormat;
|
||||
private ByteBufferPool bufferPool;
|
||||
private int datagramSize = 4 * 1024;
|
||||
private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
|
||||
|
||||
// reading
|
||||
private ByteBuffer readBuffer;
|
||||
private DataInputStream dataIn;
|
||||
private CommandReadBuffer readStack;
|
||||
|
||||
// writing
|
||||
private ByteBuffer writeBuffer;
|
||||
private BooleanStream bs = new BooleanStream();
|
||||
private DataOutputStream dataOut;
|
||||
private int largeMessageBufferSize = 128 * 1024;
|
||||
private DatagramHeader header = new DatagramHeader();
|
||||
|
||||
|
||||
public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize) {
|
||||
this.channel = channel;
|
||||
this.wireFormat = wireFormat;
|
||||
this.bufferPool = bufferPool;
|
||||
this.datagramSize = datagramSize;
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
readStack = new CommandReadBuffer(wireFormat);
|
||||
bufferPool.setDefaultSize(datagramSize);
|
||||
bufferPool.start();
|
||||
readBuffer = bufferPool.borrowBuffer();
|
||||
writeBuffer = bufferPool.borrowBuffer();
|
||||
dataIn = new DataInputStream(Channels.newInputStream(channel));
|
||||
dataOut = new DataOutputStream(Channels.newOutputStream(channel));
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
bufferPool.stop();
|
||||
}
|
||||
|
||||
public synchronized Command read() throws IOException {
|
||||
readBuffer.clear();
|
||||
int read = channel.read(readBuffer);
|
||||
DatagramHeader header = headerMarshaller.readHeader(readBuffer);
|
||||
|
||||
int remaining = readBuffer.remaining();
|
||||
int size = header.getDataSize();
|
||||
if (size > remaining) {
|
||||
throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
|
||||
}
|
||||
else if (size < remaining) {
|
||||
log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
|
||||
}
|
||||
if (header.isPartial()) {
|
||||
byte[] data = new byte[size];
|
||||
readBuffer.get(data);
|
||||
header.setPartialData(data);
|
||||
}
|
||||
else {
|
||||
Command command = (Command) wireFormat.unmarshal(dataIn);
|
||||
header.setCommand(command);
|
||||
}
|
||||
|
||||
return readStack.read(header);
|
||||
}
|
||||
|
||||
public synchronized void write(Command command) throws IOException {
|
||||
header.incrementCounter();
|
||||
int size = wireFormat.tightMarshalNestedObject1(command, bs);
|
||||
if (size < datagramSize ) {
|
||||
header.setPartial(false);
|
||||
writeBuffer.rewind();
|
||||
wireFormat.marshal(command, dataOut);
|
||||
dataOut.flush();
|
||||
channel.write(writeBuffer);
|
||||
}
|
||||
else {
|
||||
header.setPartial(true);
|
||||
header.setComplete(false);
|
||||
|
||||
// lets split the command up into chunks
|
||||
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
|
||||
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
|
||||
|
||||
byte[] data = largeBuffer.toByteArray();
|
||||
int offset = 0;
|
||||
boolean lastFragment = false;
|
||||
for (int fragment = 0, length = data.length; !lastFragment; fragment++ ) {
|
||||
// write the header
|
||||
writeBuffer.rewind();
|
||||
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
|
||||
lastFragment = offset + chunkSize >= length;
|
||||
header.setComplete(lastFragment);
|
||||
headerMarshaller.writeHeader(header, writeBuffer);
|
||||
|
||||
// now the data
|
||||
writeBuffer.put(data, offset, chunkSize);
|
||||
offset += chunkSize;
|
||||
channel.write(writeBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
public int getDatagramSize() {
|
||||
return datagramSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the default size of a datagram on the network.
|
||||
*/
|
||||
public void setDatagramSize(int datagramSize) {
|
||||
this.datagramSize = datagramSize;
|
||||
}
|
||||
|
||||
public ByteBufferPool getBufferPool() {
|
||||
return bufferPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the implementation of the byte buffer pool to use
|
||||
*/
|
||||
public void setBufferPool(ByteBufferPool bufferPool) {
|
||||
this.bufferPool = bufferPool;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* Buffers up incoming headers to reorder them. This class is only accessed by
|
||||
* one thread at once.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class CommandReadBuffer {
|
||||
|
||||
private OpenWireFormat wireFormat;
|
||||
private SortedSet headers = new TreeSet();
|
||||
private int expectedCounter;
|
||||
private ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
||||
public CommandReadBuffer(OpenWireFormat wireFormat) {
|
||||
this.wireFormat = wireFormat;
|
||||
}
|
||||
|
||||
public Command read(DatagramHeader header) throws IOException {
|
||||
if (expectedCounter != header.getCounter()) {
|
||||
// lets add it to the list for later on
|
||||
headers.add(header);
|
||||
|
||||
// lets see if the first item in the set is the next header
|
||||
header = (DatagramHeader) headers.first();
|
||||
if (expectedCounter != header.getCounter()) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// we've got a valid header so increment counter
|
||||
expectedCounter++;
|
||||
|
||||
Command answer = null;
|
||||
if (!header.isPartial()) {
|
||||
answer = header.getCommand();
|
||||
if (answer == null) {
|
||||
throw new IllegalStateException("The header should have a command!: " + header);
|
||||
}
|
||||
}
|
||||
else {
|
||||
byte[] data = header.getPartialData();
|
||||
out.write(data);
|
||||
|
||||
if (header.isComplete()) {
|
||||
answer = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
|
||||
out.reset();
|
||||
}
|
||||
}
|
||||
return answer;
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import org.apache.activemq.command.Command;
|
||||
|
||||
/**
|
||||
* Represents a header used when sending data grams
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class DatagramHeader implements Comparable {
|
||||
|
||||
private String producerId;
|
||||
private long counter;
|
||||
private boolean partial;
|
||||
private boolean complete;
|
||||
private int dataSize;
|
||||
|
||||
// transient caches
|
||||
private transient byte[] partialData;
|
||||
private transient Command command;
|
||||
|
||||
public int hashCode() {
|
||||
final int PRIME = 31;
|
||||
int result = 1;
|
||||
result = PRIME * result + (int) (counter ^ (counter >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
final DatagramHeader other = (DatagramHeader) obj;
|
||||
if (counter != other.counter)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
public int compareTo(DatagramHeader that) {
|
||||
return (int) (this.counter - that.counter);
|
||||
}
|
||||
|
||||
public int compareTo(Object that) {
|
||||
if (that instanceof DatagramHeader) {
|
||||
return compareTo((DatagramHeader) that);
|
||||
}
|
||||
return getClass().getName().compareTo(that.getClass().getName());
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
return complete;
|
||||
}
|
||||
|
||||
public void setComplete(boolean complete) {
|
||||
this.complete = complete;
|
||||
}
|
||||
|
||||
public long getCounter() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
public void setCounter(long counter) {
|
||||
this.counter = counter;
|
||||
}
|
||||
|
||||
public boolean isPartial() {
|
||||
return partial;
|
||||
}
|
||||
|
||||
public void setPartial(boolean partial) {
|
||||
this.partial = partial;
|
||||
}
|
||||
|
||||
public String getProducerId() {
|
||||
return producerId;
|
||||
}
|
||||
|
||||
public void setProducerId(String producerId) {
|
||||
this.producerId = producerId;
|
||||
}
|
||||
|
||||
public int getDataSize() {
|
||||
return dataSize;
|
||||
}
|
||||
|
||||
public void setDataSize(int dataSize) {
|
||||
this.dataSize = dataSize;
|
||||
}
|
||||
|
||||
public void incrementCounter() {
|
||||
counter++;
|
||||
}
|
||||
|
||||
public byte getFlags() {
|
||||
byte answer = 0;
|
||||
if (partial) {
|
||||
answer |= 0x1;
|
||||
}
|
||||
if (complete) {
|
||||
answer |= 0x2;
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
|
||||
public void setFlags(byte flags) {
|
||||
partial = (flags & 0x1) == 0;
|
||||
complete = (flags & 0x2) == 0;
|
||||
}
|
||||
|
||||
public Command getCommand() {
|
||||
return command;
|
||||
}
|
||||
|
||||
public void setCommand(Command command) {
|
||||
this.command = command;
|
||||
}
|
||||
|
||||
public byte[] getPartialData() {
|
||||
return partialData;
|
||||
}
|
||||
|
||||
public void setPartialData(byte[] partialData) {
|
||||
this.partialData = partialData;
|
||||
}
|
||||
|
||||
// Transient cached properties
|
||||
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class DatagramHeaderMarshaller {
|
||||
|
||||
public DatagramHeader readHeader(ByteBuffer readBuffer) {
|
||||
DatagramHeader answer = new DatagramHeader();
|
||||
answer.setCounter(readBuffer.getLong());
|
||||
byte flags = readBuffer.get();
|
||||
answer.setFlags(flags);
|
||||
return answer;
|
||||
}
|
||||
|
||||
public void writeHeader(DatagramHeader header, ByteBuffer writeBuffer) {
|
||||
writeBuffer.putLong(header.getCounter());
|
||||
writeBuffer.put(header.getFlags());
|
||||
}
|
||||
|
||||
public int getHeaderSize(DatagramHeader header) {
|
||||
return 8 + 1;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,195 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportThreadSupport;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
|
||||
/**
|
||||
* An implementation of the {@link Transport} interface using raw UDP
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
|
||||
private static final Log log = LogFactory.getLog(UdpTransport.class);
|
||||
|
||||
private CommandChannel commandChannel;
|
||||
private OpenWireFormat wireFormat;
|
||||
private ByteBufferPool bufferPool;
|
||||
private int datagramSize = 4 * 1024;
|
||||
private long maxInactivityDuration = 0; //30000;
|
||||
private InetSocketAddress socketAddress;
|
||||
private DatagramChannel channel;
|
||||
private boolean trace = false;
|
||||
private boolean useLocalHost = true;
|
||||
|
||||
protected UdpTransport(OpenWireFormat wireFormat) {
|
||||
this.wireFormat = wireFormat;
|
||||
}
|
||||
|
||||
public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
|
||||
this(wireFormat);
|
||||
this.socketAddress = createAddress(remoteLocation);
|
||||
}
|
||||
|
||||
public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) {
|
||||
this(wireFormat);
|
||||
this.socketAddress = socketAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* A one way asynchronous send
|
||||
*/
|
||||
public void oneway(Command command) throws IOException {
|
||||
checkStarted(command);
|
||||
commandChannel.write(command);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return pretty print of 'this'
|
||||
*/
|
||||
public String toString() {
|
||||
return "udp://" + socketAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* reads packets from a Socket
|
||||
*/
|
||||
public void run() {
|
||||
log.trace("TCP consumer thread starting");
|
||||
while (!isClosed()) {
|
||||
try {
|
||||
Command command = commandChannel.read();
|
||||
doConsume(command);
|
||||
}
|
||||
catch (SocketTimeoutException e) {
|
||||
}
|
||||
catch (InterruptedIOException e) {
|
||||
}
|
||||
catch (IOException e) {
|
||||
try {
|
||||
stop();
|
||||
}
|
||||
catch (Exception e2) {
|
||||
log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
|
||||
}
|
||||
onException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
public boolean isTrace() {
|
||||
return trace;
|
||||
}
|
||||
|
||||
public void setTrace(boolean trace) {
|
||||
this.trace = trace;
|
||||
}
|
||||
|
||||
public long getMaxInactivityDuration() {
|
||||
return maxInactivityDuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum inactivity duration
|
||||
*/
|
||||
public void setMaxInactivityDuration(long maxInactivityDuration) {
|
||||
this.maxInactivityDuration = maxInactivityDuration;
|
||||
}
|
||||
public boolean isUseLocalHost() {
|
||||
return useLocalHost;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether 'localhost' or the actual local host name should be used to
|
||||
* make local connections. On some operating systems such as Macs its not
|
||||
* possible to connect as the local host name so localhost is better.
|
||||
*/
|
||||
public void setUseLocalHost(boolean useLocalHost) {
|
||||
this.useLocalHost = useLocalHost;
|
||||
}
|
||||
|
||||
|
||||
public CommandChannel getCommandChannel() {
|
||||
return commandChannel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the implementation of the command channel to use.
|
||||
*/
|
||||
public void setCommandChannel(CommandChannel commandChannel) {
|
||||
this.commandChannel = commandChannel;
|
||||
}
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
|
||||
/**
|
||||
* Creates an address from the given URI
|
||||
*/
|
||||
protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
|
||||
String host = resolveHostName(remoteLocation.getHost());
|
||||
return new InetSocketAddress(host, remoteLocation.getPort());
|
||||
}
|
||||
|
||||
protected String resolveHostName(String host) throws UnknownHostException {
|
||||
String localName = InetAddress.getLocalHost().getHostName();
|
||||
if (localName != null && isUseLocalHost()) {
|
||||
if (localName.equals(host)) {
|
||||
return "localhost";
|
||||
}
|
||||
}
|
||||
return host;
|
||||
}
|
||||
|
||||
protected void doStart() throws Exception {
|
||||
if (socketAddress != null) {
|
||||
channel = DatagramChannel.open();
|
||||
channel.connect(socketAddress);
|
||||
}
|
||||
else if (channel == null) {
|
||||
throw new IllegalArgumentException("No channel configured");
|
||||
}
|
||||
commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize);
|
||||
commandChannel.start();
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
if (channel != null) {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,124 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import javax.net.ServerSocketFactory;
|
||||
import javax.net.SocketFactory;
|
||||
import org.activeio.command.WireFormat;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.transport.InactivityMonitor;
|
||||
import org.apache.activemq.transport.MutexTransport;
|
||||
import org.apache.activemq.transport.ResponseCorrelator;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportLogger;
|
||||
import org.apache.activemq.transport.TransportServer;
|
||||
import org.apache.activemq.transport.WireFormatNegotiator;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.util.URISupport;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
public class UdpTransportFactory extends TransportFactory {
|
||||
private static final Log log = LogFactory.getLog(UdpTransportFactory.class);
|
||||
|
||||
public TransportServer doBind(String brokerId, final URI location) throws IOException {
|
||||
throw new IOException("TransportServer not supported for UDP");
|
||||
/*
|
||||
try {
|
||||
Map options = new HashMap(URISupport.parseParamters(location));
|
||||
|
||||
return null;
|
||||
UdpTransportServer server = new UdpTransportServer(location);
|
||||
server.setWireFormatFactory(createWireFormatFactory(options));
|
||||
IntrospectionSupport.setProperties(server, options);
|
||||
|
||||
return server;
|
||||
}
|
||||
catch (URISyntaxException e) {
|
||||
throw IOExceptionSupport.create(e);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
public Transport configure(Transport transport, WireFormat format, Map options) {
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
UdpTransport tcpTransport = (UdpTransport) transport;
|
||||
if (tcpTransport.isTrace()) {
|
||||
transport = new TransportLogger(transport);
|
||||
}
|
||||
|
||||
if (tcpTransport.getMaxInactivityDuration() > 0) {
|
||||
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
|
||||
}
|
||||
|
||||
transport = new MutexTransport(transport);
|
||||
transport = new ResponseCorrelator(transport);
|
||||
return transport;
|
||||
}
|
||||
|
||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
UdpTransport tcpTransport = (UdpTransport) transport;
|
||||
if (tcpTransport.isTrace()) {
|
||||
transport = new TransportLogger(transport);
|
||||
}
|
||||
|
||||
if (tcpTransport.getMaxInactivityDuration() > 0) {
|
||||
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
|
||||
}
|
||||
return transport;
|
||||
}
|
||||
|
||||
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
|
||||
/*
|
||||
URI localLocation = null;
|
||||
String path = location.getPath();
|
||||
// see if the path is a local URI location
|
||||
if (path != null && path.length() > 0) {
|
||||
int localPortIndex = path.indexOf(':');
|
||||
try {
|
||||
Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
|
||||
String localString = location.getScheme() + ":/" + path;
|
||||
localLocation = new URI(localString);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("path isn't a valid local location for TcpTransport to use", e);
|
||||
}
|
||||
}
|
||||
if (localLocation != null) {
|
||||
return new UdpTransport(wf, location, localLocation);
|
||||
}
|
||||
*/
|
||||
return new UdpTransport((OpenWireFormat) wf, location);
|
||||
}
|
||||
|
||||
protected ServerSocketFactory createServerSocketFactory() {
|
||||
return ServerSocketFactory.getDefault();
|
||||
}
|
||||
|
||||
protected SocketFactory createSocketFactory() {
|
||||
return SocketFactory.getDefault();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
<html>
|
||||
<head>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
UDP based Transport implementation.
|
||||
|
||||
</body>
|
||||
</html>
|
Loading…
Reference in New Issue