Eliminating required dependency on activeio... http://issues.apache.org/activemq/browse/AMQ-907

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-09-01 00:13:23 +00:00
parent 1c0de6bda4
commit 88acb0ede7
82 changed files with 1245 additions and 292 deletions

View File

@ -223,7 +223,7 @@
<useFile>true</useFile>
<argLine>-Xmx512M</argLine>
<includes>
<include>**/*Test.*</include>
<include>**/*XTest.*</include>
</includes>
<excludes>

View File

@ -26,7 +26,6 @@ import javax.jms.Message;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import org.apache.activeio.Disposable;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;

View File

@ -26,7 +26,6 @@ import java.util.Map;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activeio.Disposable;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;

View File

@ -0,0 +1,29 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
/**
* @version $Revision$
*/
public interface Disposable {
/**
*/
void dispose();
}

View File

@ -20,7 +20,7 @@ package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
/**

View File

@ -28,10 +28,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.BrokerId;
@ -40,6 +36,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
* A Broker interceptor which allows you to trace all operations to a UDP socket.

View File

@ -35,12 +35,11 @@ import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.packet.PacketData;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.JMSExceptionSupport;
/**
@ -106,9 +105,9 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
dataOut.close();
ByteSequence bs = bytesOut.toByteSequence();
if( compressed ) {
// Prefix the real length
ByteArrayPacket packet = new ByteArrayPacket(bs);
PacketData.writeIntBig(packet, length);
int pos = bs.offset;
ByteSequenceData.writeIntBig(bs, length);
bs.offset = pos;
}
setContent(bs);
bytesOut = null;

View File

@ -34,13 +34,13 @@ import javax.jms.MapMessage;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* A <CODE>MapMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>

View File

@ -25,23 +25,20 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ClassLoading;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
import org.apache.activemq.util.JMSExceptionSupport;
/**
@ -66,7 +63,7 @@ import org.apache.activemq.util.JMSExceptionSupport;
* @see javax.jms.TextMessage
*/
public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
static final private ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); //TODO verify classloader
static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); //TODO verify classloader
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
protected transient Serializable object;
@ -162,7 +159,7 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
is = new InflaterInputStream(is);
}
DataInputStream dataIn = new DataInputStream(is);
ObjectInputStreamExt objIn = new ObjectInputStreamExt(dataIn);
ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
try {
object = (Serializable) objIn.readObject();
} catch (ClassNotFoundException ce) {
@ -190,38 +187,4 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
}
return super.toString();
}
static public class ObjectInputStreamExt extends ObjectInputStream {
public ObjectInputStreamExt(InputStream in) throws IOException {
super(in);
}
protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return load(classDesc.getName(), cl);
}
protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Class[] cinterfaces = new Class[interfaces.length];
for (int i = 0; i < interfaces.length; i++)
cinterfaces[i] = load(interfaces[i], cl);
try {
return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces);
} catch (IllegalArgumentException e) {
throw new ClassNotFoundException(null, e);
}
}
private Class load(String className, ClassLoader cl) throws ClassNotFoundException {
try {
return ClassLoading.loadClass(className, cl);
} catch ( ClassNotFoundException e ) {
return ClassLoading.loadClass(className, ACTIVEMQ_CLASSLOADER);
}
}
}
}

View File

@ -35,10 +35,10 @@ import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
import javax.jms.StreamMessage;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
/**

View File

@ -29,13 +29,13 @@ import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
import javax.jms.TextMessage;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
*

View File

@ -19,8 +19,8 @@ package org.apache.activemq.command;
import java.io.IOException;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
public interface MarshallAware {

View File

@ -24,14 +24,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* Represents an ActiveMQ message

View File

@ -25,12 +25,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
*

View File

@ -27,8 +27,8 @@ import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteArrayInputStream;
import org.xml.sax.InputSource;
public class JAXPXPathEvaluator implements XPathExpression.XPathEvaluator {

View File

@ -22,8 +22,8 @@ import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.xmlbeans.XmlObject;
public class XMLBeansXPathEvaluator implements XPathExpression.XPathEvaluator {

View File

@ -26,8 +26,8 @@ import javax.jms.TextMessage;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.xpath.CachedXPathAPI;
import org.w3c.dom.Document;
import org.w3c.dom.traversal.NodeIterator;

View File

@ -23,19 +23,17 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashMap;
import org.apache.activeio.adapter.PacketToInputStream;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.packet.PacketData;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.MarshallAware;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.ClassLoading;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.wireformat.WireFormat;
/**
*
@ -115,7 +113,7 @@ final public class OpenWireFormat implements WireFormat {
return version;
}
public Packet marshal(Object command) throws IOException {
public ByteSequence marshal(Object command) throws IOException {
if( cacheEnabled ) {
runMarshallCacheEvictionSweep();
@ -174,8 +172,9 @@ final public class OpenWireFormat implements WireFormat {
if( !sizePrefixDisabled ) {
size = sequence.getLength()-4;
ByteArrayPacket packet = new ByteArrayPacket(sequence);
PacketData.writeIntBig(packet, size);
int pos = sequence.offset;
ByteSequenceData.writeIntBig(sequence, size);
sequence.offset = pos;
}
}
@ -194,12 +193,11 @@ final public class OpenWireFormat implements WireFormat {
ma.setCachedMarshalledForm(this, sequence);
}
}
return new ByteArrayPacket(sequence);
return sequence;
}
public Object unmarshal(Packet packet) throws IOException {
ByteSequence sequence = packet.asByteSequence();
DataInputStream dis = new DataInputStream(new PacketToInputStream(packet));
public Object unmarshal(ByteSequence sequence) throws IOException {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(sequence));
if( !sizePrefixDisabled ) {
int size = dis.readInt();

View File

@ -17,9 +17,9 @@
*/
package org.apache.activemq.openwire;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
* @version $Revision$

View File

@ -22,11 +22,11 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.DataStreamMarshaller;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoading;
abstract public class BaseDataStreamMarshaller implements DataStreamMarshaller {

View File

@ -22,11 +22,11 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.DataStreamMarshaller;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoading;
abstract public class BaseDataStreamMarshaller implements DataStreamMarshaller {

View File

@ -20,9 +20,6 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -31,7 +28,10 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision: 1.10 $
@ -56,8 +56,8 @@ public class JDBCMessageStore implements MessageStore {
// Serialize the Message..
byte data[];
try {
Packet packet = wireFormat.marshal(message);
data = packet.sliceAsBytes();
ByteSequence packet = wireFormat.marshal(message);
data = ByteSequenceData.toByteArray(packet);
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
+ e, e);
@ -101,7 +101,7 @@ public class JDBCMessageStore implements MessageStore {
if (data == null)
return null;
Message answer = (Message) wireFormat.unmarshal(new ByteArrayPacket(data));
Message answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
return answer;
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
@ -153,7 +153,7 @@ public class JDBCMessageStore implements MessageStore {
c = persistenceAdapter.getTransactionContext();
adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteArrayPacket(data));
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
}

View File

@ -22,8 +22,6 @@ import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecuto
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@ -37,7 +35,9 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -20,8 +20,6 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@ -29,7 +27,9 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision: 1.6 $
@ -69,7 +69,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteArrayPacket(data));
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
}

View File

@ -28,8 +28,8 @@ import java.sql.SQLException;
import javax.jms.JMSException;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.ByteArrayOutputStream;
/**

View File

@ -24,7 +24,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayInputStream;
/**
* This JDBCAdapter inserts and extracts BLOB data using the

View File

@ -23,11 +23,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -54,7 +54,9 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -429,8 +431,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
*/
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
Packet data = journal.read(location);
return (DataStructure) wireFormat.unmarshal(data);
Packet packet = journal.read(location);
return (DataStructure) wireFormat.unmarshal(toByteSequence(packet));
}
catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
@ -460,7 +462,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message ) {
Message message = (Message) c;
@ -586,7 +588,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if( started.get() )
return journal.write(wireFormat.marshal(command), sync);
return journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}
@ -610,7 +612,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
RecordLocation location = journal.write(wireFormat.marshal(trace), false);
RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
log.info("Journal deleted: ");
} catch (IOException e) {
@ -651,4 +653,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
throw new IllegalArgumentException("The journal does not support message references.");
}
public Packet toPacket(ByteSequence sequence) {
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
}
public ByteSequence toByteSequence(Packet packet) {
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
}

View File

@ -22,11 +22,11 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -53,7 +53,9 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -429,7 +431,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
Packet data = journal.read(location);
return (DataStructure) wireFormat.unmarshal(data);
return (DataStructure) wireFormat.unmarshal(toByteSequence(data));
}
catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
@ -459,7 +461,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message ) {
Message message = (Message) c;
@ -584,7 +586,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if( started.get() )
return journal.write(wireFormat.marshal(command), sync);
return journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}
@ -608,7 +610,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
RecordLocation location = journal.write(wireFormat.marshal(trace), false);
RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
log.info("Journal deleted: ");
} catch (IOException e) {
@ -649,4 +651,13 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
throw new IllegalArgumentException("The journal does not support message references.");
}
public Packet toPacket(ByteSequence sequence) {
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
}
public ByteSequence toByteSequence(Packet packet) {
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
}

View File

@ -20,10 +20,10 @@ package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* Marshall a Message or a MessageReference
@ -38,10 +38,9 @@ public class CommandMarshaller implements Marshaller{
}
public void writePayload(Object object,DataOutput dataOut) throws IOException{
Packet packet = wireFormat.marshal(object);
byte[] data = packet.sliceAsBytes();
dataOut.writeInt(data.length);
dataOut.write(data);
ByteSequence packet = wireFormat.marshal(object);
dataOut.writeInt(packet.length);
dataOut.write(packet.data, packet.offset, packet.length);
}
@ -49,6 +48,6 @@ public class CommandMarshaller implements Marshaller{
int size=dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
return wireFormat.unmarshal(new ByteArrayPacket(data));
return wireFormat.unmarshal(new ByteSequence(data));
}
}

View File

@ -22,11 +22,11 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* Marshall a Transaction
@ -47,15 +47,13 @@ public class TransactionMarshaller implements Marshaller{
for (int i = 0; i < list.size(); i++){
TxCommand tx = (TxCommand) list.get(i);
Object key = tx.getMessageStoreKey();
Packet packet = wireFormat.marshal(key);
byte[] data = packet.sliceAsBytes();
dataOut.writeInt(data.length);
dataOut.write(data);
ByteSequence packet = wireFormat.marshal(key);
dataOut.writeInt(packet.length);
dataOut.write(packet.data, packet.offset, packet.length);
Object command = tx.getCommand();
packet = wireFormat.marshal(command);
data = packet.sliceAsBytes();
dataOut.writeInt(data.length);
dataOut.write(data);
dataOut.writeInt(packet.length);
dataOut.write(packet.data, packet.offset, packet.length);
}
}
@ -71,12 +69,12 @@ public class TransactionMarshaller implements Marshaller{
int size = dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
Object key = wireFormat.unmarshal(new ByteArrayPacket(data));
Object key = wireFormat.unmarshal(new ByteSequence(data));
command.setMessageStoreKey(key);
size = dataIn.readInt();
data=new byte[size];
dataIn.readFully(data);
BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new ByteArrayPacket(data));
BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new ByteSequence(data));
command.setCommand(bc);
list.add(command);
}

View File

@ -24,12 +24,12 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -61,7 +61,9 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -96,7 +98,6 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
private long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 5000;
@ -445,7 +446,7 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
Packet data = journal.read(location);
return (DataStructure) wireFormat.unmarshal(data);
return (DataStructure) wireFormat.unmarshal(toByteSequence(data));
}
catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
@ -475,7 +476,7 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message ) {
Message message = (Message) c;
@ -600,7 +601,7 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if( started.get() )
return journal.write(wireFormat.marshal(command), sync);
return journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}
@ -624,7 +625,7 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
RecordLocation location = journal.write(wireFormat.marshal(trace), false);
RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
log.info("Journal deleted: ");
} catch (IOException e) {
@ -670,4 +671,13 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
return store;
}
public Packet toPacket(ByteSequence sequence) {
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
}
public ByteSequence toByteSequence(Packet packet) {
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
}

View File

@ -21,11 +21,11 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -19,8 +19,8 @@ package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.command.Command;
import org.apache.activemq.wireformat.WireFormat;
public class MarshallingTransportFilter extends TransportFilter {

View File

@ -25,12 +25,12 @@ import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;

View File

@ -26,8 +26,6 @@ import java.util.Map;
import org.apache.activeio.Channel;
import org.apache.activeio.ChannelFactory;
import org.apache.activeio.adapter.SyncToAsyncChannel;
import org.apache.activeio.command.AsyncChannelToAsyncCommandChannel;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.async.AsyncChannel;
import org.apache.activeio.stream.sync.socket.SocketMetadata;
import org.apache.activemq.openwire.OpenWireFormat;
@ -42,6 +40,7 @@ import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;

View File

@ -25,14 +25,14 @@ import java.util.Map;
import org.apache.activeio.AcceptListener;
import org.apache.activeio.Channel;
import org.apache.activeio.ChannelFactory;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
import org.apache.activeio.packet.async.AsyncChannelServer;
import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;

View File

@ -0,0 +1,97 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.activeio;
import java.io.EOFException;
import java.io.IOException;
import org.apache.activeio.command.AsyncCommandChannel;
import org.apache.activeio.command.CommandListener;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.EOSPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.packet.async.AsyncChannel;
import org.apache.activeio.packet.async.AsyncChannelListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision: 1.1 $
*/
public class AsyncChannelToAsyncCommandChannel implements AsyncCommandChannel {
private AsyncChannel channel;
private WireFormat wireFormat;
public AsyncChannelToAsyncCommandChannel(AsyncChannel channel, WireFormat wireFormat) {
this.channel = channel;
this.wireFormat = wireFormat;
}
public Packet toPacket(ByteSequence sequence) {
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
}
public ByteSequence toByteSequence(Packet packet) {
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
public void writeCommand(Object command) throws IOException {
ByteSequence sequence = wireFormat.marshal(command);
channel.write(toPacket(sequence));
channel.flush();
}
public Object getAdapter(Class target) {
return channel.getAdapter(target);
}
public void dispose() {
channel.dispose();
}
public void start() throws IOException {
channel.start();
}
public void stop() throws IOException {
channel.stop();
}
public void setCommandListener(final CommandListener listener) {
channel.setAsyncChannelListener(new AsyncChannelListener() {
public void onPacket(Packet packet) {
if( packet == EOSPacket.EOS_PACKET ) {
listener.onError(new EOFException("Peer disconnected."));
return;
}
try {
Object command = wireFormat.unmarshal(toByteSequence(packet));
listener.onCommand(command);
}
catch (IOException e) {
listener.onError(e);
}
}
public void onPacketError(IOException error) {
listener.onError(error);
}
});
}
}

View File

@ -20,7 +20,7 @@ package org.apache.activemq.transport.discovery;
import java.io.IOException;
import java.net.URI;
import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;

View File

@ -21,10 +21,10 @@ import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.udp.UdpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
/**
* A factory of multicast transport classes

View File

@ -27,7 +27,6 @@ import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -49,6 +48,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;

View File

@ -19,9 +19,9 @@ package org.apache.activemq.transport.stomp;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory

View File

@ -24,12 +24,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activeio.adapter.PacketInputStream;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
@ -46,16 +44,16 @@ public class StompWireFormat implements WireFormat {
private int version=1;
public Packet marshal(Object command) throws IOException {
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
return new ByteArrayPacket(baos.toByteSequence());
return baos.toByteSequence();
}
public Object unmarshal(Packet packet) throws IOException {
PacketInputStream stream = new PacketInputStream(packet);
public Object unmarshal(ByteSequence packet) throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}

View File

@ -17,8 +17,8 @@
*/
package org.apache.activemq.transport.stomp;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.

View File

@ -33,13 +33,13 @@ import java.util.Map;
import javax.net.SocketFactory;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -27,7 +27,6 @@ import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
@ -38,6 +37,7 @@ import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -29,8 +29,6 @@ import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
@ -38,6 +36,8 @@ import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -24,8 +24,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
@ -33,6 +31,8 @@ import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.reliable.ReplayBuffer;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -24,8 +24,6 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
@ -33,6 +31,8 @@ import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.reliable.ReplayBuffer;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -24,7 +24,6 @@ import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor;
@ -40,6 +39,7 @@ import org.apache.activemq.transport.reliable.Replayer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
public class UdpTransportFactory extends TransportFactory {

View File

@ -0,0 +1,100 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.IOException;
import java.io.InputStream;
/**
* Very similar to the java.io.ByteArrayInputStream but this version
* is not thread safe.
*/
public class ByteArrayInputStream extends InputStream {
byte buffer[];
int limit;
int pos;
int mark;
public ByteArrayInputStream(byte data[]) {
this(data, 0, data.length);
}
public ByteArrayInputStream(ByteSequence sequence) {
this(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
public ByteArrayInputStream(byte data[], int offset, int size) {
this.buffer = data;
this.mark= this.pos = offset;
this.limit = offset+size;
}
public int read() throws IOException {
if( pos < limit )
return buffer[pos++] & 0xff;
else
return -1;
}
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
public int read(byte b[], int off, int len) {
if (pos < limit) {
len = Math.min(len, limit-pos);
if (len > 0) {
System.arraycopy(buffer, pos, b, off, len);
pos += len;
}
return len;
} else {
return -1;
}
}
public long skip(long len) throws IOException {
if (pos < limit) {
len = Math.min(len, limit-pos);
if (len > 0) {
pos += len;
}
return len;
} else {
return -1;
}
}
public int available() {
return limit - pos;
}
public boolean markSupported() {
return true;
}
public void mark(int markpos) {
mark = pos;
}
public void reset() {
pos = mark;
}
}

View File

@ -0,0 +1,83 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.OutputStream;
/**
* Very similar to the java.io.ByteArrayOutputStream but this version
* is not thread safe and the resulting data is returned in a ByteSequence
* to avoid an extra byte[] allocation.
*/
public class ByteArrayOutputStream extends OutputStream {
byte buffer[];
int size;
public ByteArrayOutputStream() {
this(512);
}
public ByteArrayOutputStream(int capacity) {
buffer = new byte[capacity];
}
public void write(int b) {
int newsize = size + 1;
checkCapacity(newsize);
buffer[size] = (byte) b;
size = newsize;
}
public void write(byte b[], int off, int len) {
int newsize = size + len;
checkCapacity(newsize);
System.arraycopy(b, off, buffer, size, len);
size = newsize;
}
/**
* Ensures the the buffer has at least the minimumCapacity specified.
* @param i
*/
private void checkCapacity(int minimumCapacity) {
if (minimumCapacity > buffer.length) {
byte b[] = new byte[Math.max(buffer.length << 1, minimumCapacity)];
System.arraycopy(buffer, 0, b, 0, size);
buffer = b;
}
}
public void reset() {
size = 0;
}
public ByteSequence toByteSequence() {
return new ByteSequence(buffer, 0, size);
}
public byte[] toByteArray() {
byte rc[] = new byte[size];
System.arraycopy(buffer, 0, rc, 0, size);
return rc;
}
public int size() {
return size;
}
}

View File

@ -0,0 +1,58 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
public class ByteSequence {
public byte[] data;
public int offset;
public int length;
public ByteSequence(byte data[]) {
this.data = data;
this.offset = 0;
this.length = data.length;
}
public ByteSequence(byte data[], int offset, int length) {
this.data = data;
this.offset = offset;
this.length = length;
}
public byte[] getData() {
return data;
}
public int getLength() {
return length;
}
public int getOffset() {
return offset;
}
public void setData(byte[] data) {
this.data = data;
}
public void setLength(int length) {
this.length = length;
}
public void setOffset(int offset) {
this.offset = offset;
}
}

View File

@ -0,0 +1,267 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.IOException;
/**
* Used to write and read primitives to and from a ByteSequence.
*/
final public class ByteSequenceData {
public static byte[] toByteArray(ByteSequence packet) {
if( packet.offset==0 && packet.length == packet.data.length )
return packet.data;
byte rc[] = new byte[packet.length];
System.arraycopy(packet.data, packet.offset, rc, 0, packet.length);
return rc;
}
private static void spaceNeeded(ByteSequence packet, int i) {
assert packet.offset+i <= packet.length;
}
public static int remaining(ByteSequence packet) {
return packet.length - packet.offset;
}
public static int read(ByteSequence packet) {
return packet.data[packet.offset++] & 0xff;
}
public static void readFully(ByteSequence packet, byte[] b) throws IOException {
readFully(packet, b, 0, b.length);
}
public static void readFully(ByteSequence packet, byte[] b, int off, int len) throws IOException {
spaceNeeded(packet, len);
System.arraycopy(packet.data, packet.offset, b, off, len);
packet.offset += len;
}
public static int skipBytes(ByteSequence packet, int n) throws IOException {
int rc = Math.min(n, remaining(packet));
packet.offset += rc;
return rc;
}
public static boolean readBoolean(ByteSequence packet) throws IOException {
spaceNeeded(packet, 1);
return read(packet) != 0;
}
public static byte readByte(ByteSequence packet) throws IOException {
spaceNeeded(packet, 1);
return (byte) read(packet);
}
public static int readUnsignedByte(ByteSequence packet) throws IOException {
spaceNeeded(packet, 1);
return read(packet);
}
public static short readShortBig(ByteSequence packet) throws IOException {
spaceNeeded(packet, 2);
return (short) ((read(packet) << 8) + (read(packet) << 0));
}
public static short readShortLittle(ByteSequence packet) throws IOException {
spaceNeeded(packet, 2);
return (short) ((read(packet) << 0) + (read(packet) << 8) );
}
public static int readUnsignedShortBig(ByteSequence packet) throws IOException {
spaceNeeded(packet, 2);
return ((read(packet) << 8) + (read(packet) << 0));
}
public static int readUnsignedShortLittle(ByteSequence packet) throws IOException {
spaceNeeded(packet, 2);
return ((read(packet) << 0) + (read(packet) << 8) );
}
public static char readCharBig(ByteSequence packet) throws IOException {
spaceNeeded(packet, 2);
return (char) ((read(packet) << 8) + (read(packet) << 0));
}
public static char readCharLittle(ByteSequence packet) throws IOException {
spaceNeeded(packet, 2);
return (char) ((read(packet) << 0) + (read(packet) << 8) );
}
public static int readIntBig(ByteSequence packet) throws IOException {
spaceNeeded(packet, 4);
return ((read(packet) << 24) +
(read(packet) << 16) +
(read(packet) << 8) +
(read(packet) << 0));
}
public static int readIntLittle(ByteSequence packet) throws IOException {
spaceNeeded(packet, 4);
return ((read(packet) << 0) +
(read(packet) << 8) +
(read(packet) << 16) +
(read(packet) << 24));
}
public static long readLongBig(ByteSequence packet) throws IOException {
spaceNeeded(packet, 8);
return (((long) read(packet) << 56) +
((long) read(packet) << 48) +
((long) read(packet) << 40) +
((long) read(packet) << 32) +
((long) read(packet) << 24) +
((read(packet)) << 16) +
((read(packet)) << 8) +
((read(packet)) << 0));
}
public static long readLongLittle(ByteSequence packet) throws IOException {
spaceNeeded(packet, 8);
return ((read(packet) << 0) +
(read(packet) << 8) +
(read(packet) << 16) +
((long) read(packet) << 24) +
((long) read(packet) << 32) +
((long) read(packet) << 40) +
((long) read(packet) << 48) +
((long) read(packet) << 56));
}
public static double readDoubleBig(ByteSequence packet) throws IOException {
return Double.longBitsToDouble(readLongBig(packet));
}
public static double readDoubleLittle(ByteSequence packet) throws IOException {
return Double.longBitsToDouble(readLongLittle(packet));
}
public static float readFloatBig(ByteSequence packet) throws IOException {
return Float.intBitsToFloat(readIntBig(packet));
}
public static float readFloatLittle(ByteSequence packet) throws IOException {
return Float.intBitsToFloat(readIntLittle(packet));
}
public static void write(ByteSequence packet, int b) throws IOException {
spaceNeeded(packet, 1);
packet.data[packet.offset++] = (byte) b;
}
public static void write(ByteSequence packet, byte[] b) throws IOException {
write(packet, b, 0, b.length);
}
public static void write(ByteSequence packet, byte[] b, int off, int len) throws IOException {
spaceNeeded(packet, len);
System.arraycopy(b, off, packet.data, packet.offset, len);
packet.offset += len;
}
public static void writeBoolean(ByteSequence packet, boolean v) throws IOException {
spaceNeeded(packet, 1);
write(packet,v ? 1 : 0);
}
public static void writeByte(ByteSequence packet, int v) throws IOException {
spaceNeeded(packet, 1);
write(packet,v);
}
public static void writeShortBig(ByteSequence packet, int v) throws IOException {
spaceNeeded(packet, 2);
write(packet,(v >>> 8) & 0xFF);
write(packet,(v >>> 0) & 0xFF);
}
public static void writeShortLittle(ByteSequence packet, int v) throws IOException {
spaceNeeded(packet, 2);
write(packet,(v >>> 0) & 0xFF);
write(packet,(v >>> 8) & 0xFF);
}
public static void writeCharBig(ByteSequence packet, int v) throws IOException {
spaceNeeded(packet, 2);
write(packet,(v >>> 8) & 0xFF);
write(packet,(v >>> 0) & 0xFF);
}
public static void writeCharLittle(ByteSequence packet, int v) throws IOException {
spaceNeeded(packet, 2);
write(packet,(v >>> 0) & 0xFF);
write(packet,(v >>> 8) & 0xFF);
}
public static void writeIntBig(ByteSequence packet, int v) throws IOException {
spaceNeeded(packet, 4);
write(packet,(v >>> 24) & 0xFF);
write(packet,(v >>> 16) & 0xFF);
write(packet,(v >>> 8) & 0xFF);
write(packet,(v >>> 0) & 0xFF);
}
public static void writeIntLittle(ByteSequence packet, int v) throws IOException {
spaceNeeded(packet, 4);
write(packet,(v >>> 0) & 0xFF);
write(packet,(v >>> 8) & 0xFF);
write(packet,(v >>> 16) & 0xFF);
write(packet,(v >>> 24) & 0xFF);
}
public static void writeLongBig(ByteSequence packet, long v) throws IOException {
spaceNeeded(packet, 8);
write(packet,(int) (v >>> 56) & 0xFF);
write(packet,(int) (v >>> 48) & 0xFF);
write(packet,(int) (v >>> 40) & 0xFF);
write(packet,(int) (v >>> 32) & 0xFF);
write(packet,(int) (v >>> 24) & 0xFF);
write(packet,(int) (v >>> 16) & 0xFF);
write(packet,(int) (v >>> 8) & 0xFF);
write(packet,(int) (v >>> 0) & 0xFF);
}
public static void writeLongLittle(ByteSequence packet, long v) throws IOException {
spaceNeeded(packet, 8);
write(packet,(int) (v >>> 0) & 0xFF);
write(packet,(int) (v >>> 8) & 0xFF);
write(packet,(int) (v >>> 16) & 0xFF);
write(packet,(int) (v >>> 24) & 0xFF);
write(packet,(int) (v >>> 32) & 0xFF);
write(packet,(int) (v >>> 40) & 0xFF);
write(packet,(int) (v >>> 48) & 0xFF);
write(packet,(int) (v >>> 56) & 0xFF);
}
public static void writeDoubleBig(ByteSequence packet, double v) throws IOException {
writeLongBig(packet, Double.doubleToLongBits(v));
}
public static void writeDoubleLittle(ByteSequence packet, double v) throws IOException {
writeLongLittle(packet, Double.doubleToLongBits(v));
}
public static void writeFloatBig(ByteSequence packet, float v) throws IOException {
writeIntBig(packet, Float.floatToIntBits(v));
}
public static void writeFloatLittle(ByteSequence packet, float v) throws IOException {
writeIntLittle(packet, Float.floatToIntBits(v));
}
public static void writeRawDoubleBig(ByteSequence packet, double v) throws IOException {
writeLongBig(packet, Double.doubleToRawLongBits(v));
}
public static void writeRawDoubleLittle(ByteSequence packet, double v) throws IOException {
writeLongLittle(packet, Double.doubleToRawLongBits(v));
}
public static void writeRawFloatBig(ByteSequence packet, float v) throws IOException {
writeIntBig(packet, Float.floatToRawIntBits(v));
}
public static void writeRawFloatLittle(ByteSequence packet, float v) throws IOException {
writeIntLittle(packet, Float.floatToRawIntBits(v));
}
}

View File

@ -0,0 +1,47 @@
/**
*
*/
package org.apache.activemq.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
import java.lang.reflect.Proxy;
public class ClassLoadingAwareObjectInputStream extends ObjectInputStream {
private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader();
public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
super(in);
}
protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return load(classDesc.getName(), cl);
}
protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Class[] cinterfaces = new Class[interfaces.length];
for (int i = 0; i < interfaces.length; i++)
cinterfaces[i] = load(interfaces[i], cl);
try {
return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces);
} catch (IllegalArgumentException e) {
throw new ClassNotFoundException(null, e);
}
}
private Class load(String className, ClassLoader cl) throws ClassNotFoundException {
try {
return ClassLoading.loadClass(className, cl);
} catch ( ClassNotFoundException e ) {
return ClassLoading.loadClass(className, FALLBACK_CLASS_LOADER);
}
}
}

View File

@ -0,0 +1,106 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class FactoryFinder {
private final String path;
private final ConcurrentHashMap classMap = new ConcurrentHashMap();
public FactoryFinder(String path) {
this.path = path;
}
/**
* Creates a new instance of the given key
*
* @param key is the key to add to the path to find a text file
* containing the factory name
* @return a newly created instance
*/
public Object newInstance(String key)
throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException
{
return newInstance(key, null);
}
public Object newInstance(String key, String propertyPrefix)
throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException
{
if (propertyPrefix == null)
propertyPrefix = "";
Class clazz = (Class) classMap.get(propertyPrefix + key);
if (clazz == null) {
clazz = newInstance(doFindFactoryProperies(key), propertyPrefix);
classMap.put(propertyPrefix + key, clazz);
}
return clazz.newInstance();
}
private Class newInstance(Properties properties, String propertyPrefix) throws ClassNotFoundException, IOException {
String className = properties.getProperty(propertyPrefix + "class");
if (className == null) {
throw new IOException("Expected property is missing: " + propertyPrefix + "class");
}
Class clazz;
try {
clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
} catch (ClassNotFoundException e) {
clazz = FactoryFinder.class.getClassLoader().loadClass(className);
}
return clazz;
}
private Properties doFindFactoryProperies(String key) throws IOException {
String uri = path + key;
// lets try the thread context class loader first
InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(uri);
if (in == null) {
in = FactoryFinder.class.getClassLoader().getResourceAsStream(uri);
if (in == null) {
throw new IOException("Could not find factory class for resource: " + uri);
}
}
// lets load the file
BufferedInputStream reader = null;
try {
reader = new BufferedInputStream(in);
Properties properties = new Properties();
properties.load(reader);
return properties;
} finally {
try {
reader.close();
} catch (Exception e) {
}
}
}
}

View File

@ -0,0 +1,75 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.wireformat;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
/**
* A simple implementation which uses Object Stream serialization.
*
* @version $Revision: 1.1 $
*/
public class ObjectStreamWireFormat implements WireFormat {
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(baos);
marshal(command, ds);
ds.close();
return baos.toByteSequence();
}
public Object unmarshal(ByteSequence packet) throws IOException {
return unmarshal(new DataInputStream(new ByteArrayInputStream(packet)));
}
public void marshal(Object command, DataOutputStream ds) throws IOException {
ObjectOutputStream out = new ObjectOutputStream(ds);
out.writeObject(command);
out.flush();
out.reset();
}
public Object unmarshal(DataInputStream ds) throws IOException {
try {
ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(ds);
Object command;
command = in.readObject();
in.close();
return command;
} catch (ClassNotFoundException e) {
throw (IOException)new IOException("unmarshal failed: "+e).initCause(e);
}
}
public void setVersion(int version) {
}
public int getVersion() {
return 0;
}
}

View File

@ -0,0 +1,65 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.wireformat;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.util.ByteSequence;
/**
* Provides a mechanism to marshal commands into and out of packets
* or into and out of streams, Channels and Datagrams.
*
* @version $Revision: 1.1 $
*/
public interface WireFormat {
/**
* Packet based marshaling
*/
ByteSequence marshal(Object command) throws IOException;
/**
* Packet based un-marshaling
*/
Object unmarshal(ByteSequence packet) throws IOException;
/**
* Stream based marshaling
*/
void marshal(Object command, DataOutputStream out) throws IOException;
/**
* Packet based un-marshaling
*/
Object unmarshal(DataInputStream in) throws IOException;
/**
* @param the version of the wire format
*/
public void setVersion(int version);
/**
* @return the version of the wire format
*/
public int getVersion();
}

View File

@ -0,0 +1,22 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.wireformat;
public interface WireFormatFactory {
WireFormat createWireFormat();
}

View File

@ -0,0 +1,27 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
<p>
An API for WireFormats which are used to turn object into bytes and bytes into objects.
</p>
</body>
</html>

View File

@ -21,10 +21,10 @@ import java.io.IOException;
import junit.framework.Test;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
/**
* Runs against the broker but marshals all request and response commands.

View File

@ -28,10 +28,10 @@ import javax.jms.MessageNotWriteableException;
import junit.framework.TestCase;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision$

View File

@ -26,7 +26,7 @@ import javax.jms.MessageNotWriteableException;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.util.ByteSequence;
/**
* @version $Revision$

View File

@ -25,10 +25,10 @@ import java.util.Arrays;
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
public abstract class DataStructureTestSupport extends CombinationTestSupport {
public boolean cacheEnabled;
@ -158,7 +158,7 @@ public abstract class DataStructureTestSupport extends CombinationTestSupport {
}
protected Object marshalAndUnmarshall(Object original, WireFormat wireFormat) throws IOException {
Packet packet = wireFormat.marshal(original);
ByteSequence packet = wireFormat.marshal(original);
return wireFormat.unmarshal(packet);
}

View File

@ -21,7 +21,7 @@ import java.io.IOException;
import junit.framework.Test;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.util.ByteSequence;
public class MessageSendTest extends DataStructureTestSupport {

View File

@ -22,8 +22,6 @@ import java.util.List;
import junit.framework.TestCase;
import org.apache.activeio.command.DefaultWireFormat;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
@ -43,6 +41,8 @@ import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.activeio.ActiveIOTransportServer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.ObjectStreamWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -132,7 +132,7 @@ public class ConfigTest extends TestCase {
assertTrue("Should have created an EmbeddedDataSource",
((JDBCPersistenceAdapter)adapter).getDataSource() instanceof EmbeddedDataSource);
assertTrue("Should have created a DefaultWireFormat",
((JDBCPersistenceAdapter)adapter).getWireFormat() instanceof DefaultWireFormat);
((JDBCPersistenceAdapter)adapter).getWireFormat() instanceof ObjectStreamWireFormat);
log.info("Success");
} finally {

View File

@ -35,7 +35,6 @@ import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
@ -57,6 +56,7 @@ import org.apache.activemq.openwire.v1.ActiveMQTextMessageTest;
import org.apache.activemq.openwire.v1.BrokerInfoTest;
import org.apache.activemq.openwire.v1.MessageAckTest;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.util.ByteSequence;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -60,13 +60,13 @@ public abstract class MessageTestSupport extends BaseCommandTestSupport {
{
byte data[] = "Content:11".getBytes();
info.setContent(new org.apache.activeio.packet.ByteSequence(data,0,data.length));
info.setContent(new org.apache.activemq.util.ByteSequence(data,0,data.length));
}
{
byte data[] = "MarshalledProperties:12".getBytes();
info.setMarshalledProperties(new org.apache.activeio.packet.ByteSequence(data,0,data.length));
info.setMarshalledProperties(new org.apache.activemq.util.ByteSequence(data,0,data.length));
}
info.setDataStructure(createDataStructure("DataStructure:13"));

View File

@ -55,7 +55,7 @@ public class WireFormatInfoTest extends DataFileGeneratorTestSupport {
{
byte data[] = "MarshalledProperties:1".getBytes();
info.setMarshalledProperties(new org.apache.activeio.packet.ByteSequence(data,0,data.length));
info.setMarshalledProperties(new org.apache.activemq.util.ByteSequence(data,0,data.length));
}

View File

@ -1 +1 @@
/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v2; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for Message * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public abstract class MessageTestSupport extends BaseCommandTestSupport { protected void populateObject(Object object) throws Exception { super.populateObject(object); Message info = (Message) object; info.setProducerId(createProducerId("ProducerId:1")); info.setDestination(createActiveMQDestination("Destination:2")); info.setTransactionId(createTransactionId("TransactionId:3")); info.setOriginalDestination(createActiveMQDestination("OriginalDestination:4")); info.setMessageId(createMessageId("MessageId:5")); info.setOriginalTransactionId(createTransactionId("OriginalTransactionId:6")); info.setGroupID("GroupID:7"); info.setGroupSequence(1); info.setCorrelationId("CorrelationId:8"); info.setPersistent(true); info.setExpiration(1); info.setPriority((byte) 1); info.setReplyTo(createActiveMQDestination("ReplyTo:9")); info.setTimestamp(2); info.setType("Type:10"); { byte data[] = "Content:11".getBytes(); info.setContent(new org.apache.activeio.packet.ByteSequence(data,0,data.length)); } { byte data[] = "MarshalledProperties:12".getBytes(); info.setMarshalledProperties(new org.apache.activeio.packet.ByteSequence(data,0,data.length)); } info.setDataStructure(createDataStructure("DataStructure:13")); info.setTargetConsumerId(createConsumerId("TargetConsumerId:14")); info.setCompressed(false); info.setRedeliveryCounter(2); { BrokerId value[] = new BrokerId[2]; for( int i=0; i < 2; i++ ) { value[i] = createBrokerId("BrokerPath:15"); } info.setBrokerPath(value); } info.setArrival(3); info.setUserID("UserID:16"); info.setRecievedByDFBridge(true); } }
/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v2; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for Message * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public abstract class MessageTestSupport extends BaseCommandTestSupport { protected void populateObject(Object object) throws Exception { super.populateObject(object); Message info = (Message) object; info.setProducerId(createProducerId("ProducerId:1")); info.setDestination(createActiveMQDestination("Destination:2")); info.setTransactionId(createTransactionId("TransactionId:3")); info.setOriginalDestination(createActiveMQDestination("OriginalDestination:4")); info.setMessageId(createMessageId("MessageId:5")); info.setOriginalTransactionId(createTransactionId("OriginalTransactionId:6")); info.setGroupID("GroupID:7"); info.setGroupSequence(1); info.setCorrelationId("CorrelationId:8"); info.setPersistent(true); info.setExpiration(1); info.setPriority((byte) 1); info.setReplyTo(createActiveMQDestination("ReplyTo:9")); info.setTimestamp(2); info.setType("Type:10"); { byte data[] = "Content:11".getBytes(); info.setContent(new org.apache.activemq.util.ByteSequence(data,0,data.length)); } { byte data[] = "MarshalledProperties:12".getBytes(); info.setMarshalledProperties(new org.apache.activemq.util.ByteSequence(data,0,data.length)); } info.setDataStructure(createDataStructure("DataStructure:13")); info.setTargetConsumerId(createConsumerId("TargetConsumerId:14")); info.setCompressed(false); info.setRedeliveryCounter(2); { BrokerId value[] = new BrokerId[2]; for( int i=0; i < 2; i++ ) { value[i] = createBrokerId("BrokerPath:15"); } info.setBrokerPath(value); } info.setArrival(3); info.setUserID("UserID:16"); info.setRecievedByDFBridge(true); } }

View File

@ -1 +1 @@
/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v2; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for WireFormatInfo * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public class WireFormatInfoTest extends DataFileGeneratorTestSupport { public static WireFormatInfoTest SINGLETON = new WireFormatInfoTest(); public Object createObject() throws Exception { WireFormatInfo info = new WireFormatInfo(); populateObject(info); return info; } protected void populateObject(Object object) throws Exception { super.populateObject(object); WireFormatInfo info = (WireFormatInfo) object; info.setVersion(1); { byte data[] = "MarshalledProperties:1".getBytes(); info.setMarshalledProperties(new org.apache.activeio.packet.ByteSequence(data,0,data.length)); } } }
/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v2; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for WireFormatInfo * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public class WireFormatInfoTest extends DataFileGeneratorTestSupport { public static WireFormatInfoTest SINGLETON = new WireFormatInfoTest(); public Object createObject() throws Exception { WireFormatInfo info = new WireFormatInfo(); populateObject(info); return info; } protected void populateObject(Object object) throws Exception { super.populateObject(object); WireFormatInfo info = (WireFormatInfo) object; info.setVersion(1); { byte data[] = "MarshalledProperties:1".getBytes(); info.setMarshalledProperties(new org.apache.activemq.util.ByteSequence(data,0,data.length)); } } }

View File

@ -26,6 +26,7 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
@ -73,7 +74,10 @@ public class HttpClientTransport extends HttpTransportSupport {
PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
configureMethod(httpMethod);
httpMethod.setRequestBody(getTextWireFormat().toString(command));
String data = getTextWireFormat().marshalText(command);
byte[] bytes = data.getBytes("UTF-8");
httpMethod.setRequestBody(new ByteArrayInputStream(bytes));
try {
HttpClient client = getSendHttpClient();
@ -127,8 +131,7 @@ public class HttpClientTransport extends HttpTransportSupport {
else {
// checkSession(httpMethod);
DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
Command command = getTextWireFormat().readCommand(stream);
Command command = (Command) getTextWireFormat().unmarshal(stream);
if (command == null) {
log.warn("Received null command from url: " + remoteUrl);
} else {

View File

@ -29,7 +29,10 @@ import org.springframework.core.io.ClassPathResource;
* @version $Revision$
*/
public class HttpSpringEmbeddedTunnelServlet extends HttpEmbeddedTunnelServlet {
/**
private static final long serialVersionUID = -6568661997192814908L;
/**
* Factory method to create a new broker
*/
protected BrokerService createBroker() throws Exception {

View File

@ -17,17 +17,8 @@
*/
package org.apache.activemq.transport.http;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.HttpURLConnection;
@ -35,6 +26,17 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision$
*/
@ -66,7 +68,7 @@ public class HttpTransport extends HttpTransportSupport {
}
HttpURLConnection connection = getSendConnection();
String text = getTextWireFormat().toString(command);
String text = getTextWireFormat().marshalText(command);
Writer writer = new OutputStreamWriter(connection.getOutputStream());
writer.write(text);
writer.flush();
@ -98,7 +100,18 @@ public class HttpTransport extends HttpTransportSupport {
}
else {
// checkSession(connection);
Command command = getTextWireFormat().readCommand(new DataInputStream(connection.getInputStream()));
// Create a String for the UTF content
InputStream is = connection.getInputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength()>0?connection.getContentLength():1024);
int c=0;
while( (c=is.read())>= 0 ) {
baos.write(c);
}
ByteSequence sequence = baos.toByteSequence();
String data = new String(sequence.data, sequence.offset, sequence.length, "UTF-8");
Command command = (Command) getTextWireFormat().unmarshalText(data);
if (command == null) {
log.warn("Received null packet from url: " + remoteUrl);

View File

@ -21,13 +21,13 @@ import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -99,7 +99,7 @@ public class HttpTunnelServlet extends HttpServlet {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// Read the command directly from the reader
Command command = wireFormat.readCommand(request.getReader());
Command command = (Command) wireFormat.unmarshalText(request.getReader());
if (command instanceof WireFormatInfo) {
WireFormatInfo info = (WireFormatInfo) command;

View File

@ -21,10 +21,10 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.http.HttpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
/**
* Factory of HTTPS based transports

View File

@ -18,11 +18,14 @@
package org.apache.activemq.transport.util;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Reader;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.command.Command;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* Adds the extra methods available to text based wire format implementations
@ -31,14 +34,31 @@ import org.apache.activemq.command.Command;
*/
public abstract class TextWireFormat implements WireFormat {
public abstract Command readCommand(String text);
public abstract Object unmarshalText(String text);
public abstract Object unmarshalText(Reader reader);
public abstract String marshalText(Object command);
public abstract Command readCommand(Reader reader);
public abstract String toString(Command command);
public Command readCommand(DataInputStream in) throws IOException {
String text = in.readUTF();
return readCommand(text);
public void marshal(Object command, DataOutputStream out) throws IOException {
out.writeUTF(marshalText(command));
}
public Object unmarshal(DataInputStream in) throws IOException {
String text = in.readUTF();
return unmarshalText(text);
}
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
return baos.toByteSequence();
}
public Object unmarshal(ByteSequence packet) throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}
}

View File

@ -17,19 +17,11 @@
*/
package org.apache.activemq.transport.xstream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Reader;
import javax.jms.JMSException;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import com.thoughtworks.xstream.XStream;
@ -44,23 +36,6 @@ public class XStreamWireFormat extends TextWireFormat {
private XStream xStream;
private int version;
public void marshal(Object command, DataOutputStream out) throws IOException {
String text = getXStream().toXML(command);
out.writeUTF(text);
}
public Packet marshal(Object command) throws IOException {
return null;
}
public Object unmarshal(DataInputStream arg0) throws IOException {
return null;
}
public Object unmarshal(Packet arg0) throws IOException {
return null;
}
public int getVersion() {
return version;
}
@ -69,43 +44,21 @@ public class XStreamWireFormat extends TextWireFormat {
this.version = version;
}
public Packet readPacket(DataInput in) throws IOException {
String text = in.readUTF();
return (Packet) getXStream().fromXML(text);
}
public Packet readPacket(int firstByte, DataInput in) throws IOException {
String text = in.readUTF();
return (Packet) getXStream().fromXML(text);
}
public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
String text = getXStream().toXML(packet);
out.writeUTF(text);
return null;
}
public WireFormat copy() {
return new XStreamWireFormat();
}
public String toString(Packet packet) {
return getXStream().toXML(packet);
}
public Packet fromString(String xml) {
return (Packet) getXStream().fromXML(xml);
}
public Command readCommand(String text) {
public Object unmarshalText(String text) {
return (Command) getXStream().fromXML(text);
}
public Command readCommand(Reader reader) {
public Object unmarshalText(Reader reader) {
return (Command) getXStream().fromXML(reader);
}
public String toString(Command command) {
public String marshalText(Object command) {
return getXStream().toXML(command);
}
@ -146,4 +99,5 @@ public class XStreamWireFormat extends TextWireFormat {
return new XStream();
}
}

View File

@ -17,8 +17,8 @@
*/
package org.apache.activemq.transport.xstream;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
*

View File

@ -17,9 +17,9 @@
*/
package org.apache.activemq.transport.xstream;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.MessageTest;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,7 +34,7 @@ public class XStreamWireFormatTest extends MessageTest {
public void assertBeanMarshalls(Object original) throws IOException {
super.assertBeanMarshalls(original);
String xml = getXStreamWireFormat().toString((Command) original);
String xml = getXStreamWireFormat().marshalText((Command) original);
log.info(original.getClass().getName() + " as XML is:");
log.info(xml);
}

View File

@ -16,8 +16,8 @@
*/
package org.apache.activemq.web;
import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.web.view.MessageRenderer;

View File

@ -22,8 +22,6 @@ import java.util.List;
import junit.framework.TestCase;
import org.apache.activeio.command.DefaultWireFormat;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
@ -43,6 +41,8 @@ import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.activeio.ActiveIOTransportServer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.ObjectStreamWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -132,7 +132,7 @@ public class ConfigTest extends TestCase {
assertTrue("Should have created an EmbeddedDataSource",
((JDBCPersistenceAdapter)adapter).getDataSource() instanceof EmbeddedDataSource);
assertTrue("Should have created a DefaultWireFormat",
((JDBCPersistenceAdapter)adapter).getWireFormat() instanceof DefaultWireFormat);
((JDBCPersistenceAdapter)adapter).getWireFormat() instanceof ObjectStreamWireFormat);
log.info("Success");
} finally {