Fixing eol-style

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@426583 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-28 16:00:21 +00:00
parent a23f329fb7
commit d44765df12
4 changed files with 562 additions and 562 deletions

View File

@ -1,270 +1,270 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string.h>
#include "activemq/command/AbstractCommand.hpp"
#include "activemq/command/ActiveMQMessage.hpp"
#include "activemq/command/ActiveMQBytesMessage.hpp"
#include "activemq/command/ActiveMQMapMessage.hpp"
#include "activemq/command/ActiveMQObjectMessage.hpp"
#include "activemq/command/ActiveMQStreamMessage.hpp"
#include "activemq/command/ActiveMQTextMessage.hpp"
#include "activemq/command/ActiveMQQueue.hpp"
#include "activemq/command/ConsumerId.hpp"
#include "activemq/command/ProducerId.hpp"
#include "activemq/command/MessageId.hpp"
#include "activemq/command/LocalTransactionId.hpp"
#include "activemq/command/MessageAck.hpp"
#include "activemq/command/MessageDispatch.hpp"
#include "activemq/command/Response.hpp"
#include "activemq/command/ConsumerInfo.hpp"
#include "activemq/command/IntegerResponse.hpp"
#include "activemq/command/ProducerInfo.hpp"
#include "activemq/command/BrokerInfo.hpp"
#include "activemq/command/KeepAliveInfo.hpp"
#include "activemq/command/ConnectionInfo.hpp"
#include "activemq/command/RemoveInfo.hpp"
#include "activemq/command/RemoveSubscriptionInfo.hpp"
#include "activemq/command/SessionInfo.hpp"
#include "activemq/command/TransactionInfo.hpp"
#include "activemq/command/WireFormatInfo.hpp"
#include "activemq/command/BrokerId.hpp"
#include "activemq/command/ShutdownInfo.hpp"
using namespace apache::activemq::command;
/*
*
*/
int AbstractCommand::getCommandId()
{
return commandId ;
}
/*
*
*/
void AbstractCommand::setCommandId(int id)
{
commandId = id ;
}
/*
*
*/
bool AbstractCommand::getResponseRequired()
{
return responseRequired ;
}
/*
*
*/
void AbstractCommand::setResponseRequired(bool value)
{
responseRequired = value ;
}
/*
*
*/
unsigned char AbstractCommand::getDataStructureType()
{
return 0 ;
}
/*
*
*/
bool AbstractCommand::isMarshallAware()
{
return false ;
}
/*
*
*/
int AbstractCommand::marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> writer) throw(IOException)
{
return 0 ;
}
/*
*
*/
void AbstractCommand::unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> reader) throw(IOException)
{
}
/*
*
*/
p<IDataStructure> AbstractCommand::createObject(unsigned char type)
{
switch( type )
{
case ActiveMQMessage::TYPE:
return new ActiveMQMessage() ;
case ActiveMQTextMessage::TYPE:
return new ActiveMQTextMessage() ;
case ActiveMQObjectMessage::TYPE:
return new ActiveMQObjectMessage() ;
case ActiveMQBytesMessage::TYPE:
return new ActiveMQBytesMessage() ;
case ActiveMQStreamMessage::TYPE:
return new ActiveMQStreamMessage() ;
case ActiveMQMapMessage::TYPE:
return new ActiveMQMapMessage() ;
case ActiveMQQueue::TYPE:
return new ActiveMQQueue() ;
case ConsumerId::TYPE:
return new ConsumerId() ;
case ProducerId::TYPE:
return new ProducerId() ;
case MessageId::TYPE:
return new MessageId() ;
case LocalTransactionId::TYPE:
return new LocalTransactionId() ;
case MessageAck::TYPE:
return new MessageAck() ;
case MessageDispatch::TYPE:
return new MessageDispatch() ;
case Response::TYPE:
return new Response() ;
case ConsumerInfo::TYPE:
return new ConsumerInfo() ;
case ProducerInfo::TYPE:
return new ProducerInfo() ;
case TransactionInfo::TYPE:
return new TransactionInfo() ;
case BrokerInfo::TYPE:
return new BrokerInfo() ;
case BrokerId::TYPE:
return new BrokerId() ;
case ConnectionInfo::TYPE:
return new ConnectionInfo() ;
case SessionInfo::TYPE:
return new SessionInfo() ;
case RemoveSubscriptionInfo::TYPE:
return new RemoveSubscriptionInfo() ;
case IntegerResponse::TYPE:
return new IntegerResponse() ;
case WireFormatInfo::TYPE:
return new WireFormatInfo() ;
case RemoveInfo::TYPE:
return new RemoveInfo() ;
case KeepAliveInfo::TYPE:
return new KeepAliveInfo() ;
case ShutdownInfo::TYPE:
return new ShutdownInfo() ;
default:
return NULL ;
}
}
/*
*
*/
p<string> AbstractCommand::getDataStructureTypeAsString(unsigned char type)
{
p<string> packetType = new string() ;
switch( type )
{
case ActiveMQMessage::TYPE:
packetType->assign("ACTIVEMQ_MESSAGE") ;
break ;
case ActiveMQTextMessage::TYPE:
packetType->assign("ACTIVEMQ_TEXT_MESSAGE") ;
break ;
case ActiveMQObjectMessage::TYPE:
packetType->assign("ACTIVEMQ_OBJECT_MESSAGE") ;
break ;
case ActiveMQBytesMessage::TYPE:
packetType->assign("ACTIVEMQ_BYTES_MESSAGE") ;
break ;
case ActiveMQStreamMessage::TYPE:
packetType->assign("ACTIVEMQ_STREAM_MESSAGE") ;
break ;
case ActiveMQMapMessage::TYPE:
packetType->assign("ACTIVEMQ_MAP_MESSAGE") ;
break ;
case ActiveMQQueue::TYPE:
packetType->assign("ACTIVEMQ_QUEUE") ;
break ;
case ConsumerId::TYPE:
packetType->assign("CONSUMER_ID") ;
break ;
case ProducerId::TYPE:
packetType->assign("PRODUCER_ID") ;
break ;
case MessageId::TYPE:
packetType->assign("MESSAGE_ID") ;
break ;
case LocalTransactionId::TYPE:
packetType->assign("LOCAL_TRANSACTION_ID") ;
break ;
case MessageAck::TYPE:
packetType->assign("ACTIVEMQ_MSG_ACK") ;
break ;
case MessageDispatch::TYPE:
packetType->assign("ACTIVEMQ_MSG_DISPATCH") ;
break ;
case Response::TYPE:
packetType->assign("RESPONSE") ;
break ;
case ConsumerInfo::TYPE:
packetType->assign("CONSUMER_INFO") ;
break ;
case ProducerInfo::TYPE:
packetType->assign("PRODUCER_INFO") ;
break;
case TransactionInfo::TYPE:
packetType->assign("TRANSACTION_INFO") ;
break ;
case BrokerInfo::TYPE:
packetType->assign("BROKER_INFO") ;
break ;
case ConnectionInfo::TYPE:
packetType->assign("CONNECTION_INFO") ;
break ;
case SessionInfo::TYPE:
packetType->assign("SESSION_INFO") ;
break ;
case RemoveSubscriptionInfo::TYPE:
packetType->assign("DURABLE_UNSUBSCRIBE") ;
break ;
case IntegerResponse::TYPE:
packetType->assign("INT_RESPONSE_RECEIPT_INFO") ;
break ;
case WireFormatInfo::TYPE:
packetType->assign("WIRE_FORMAT_INFO") ;
break ;
case RemoveInfo::TYPE:
packetType->assign("REMOVE_INFO") ;
break ;
case KeepAliveInfo::TYPE:
packetType->assign("KEEP_ALIVE") ;
break ;
case ShutdownInfo::TYPE:
packetType->assign("SHUTDOWN") ;
break ;
default:
packetType->assign("UNDEFINED") ;
break ;
}
return packetType ;
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string.h>
#include "activemq/command/AbstractCommand.hpp"
#include "activemq/command/ActiveMQMessage.hpp"
#include "activemq/command/ActiveMQBytesMessage.hpp"
#include "activemq/command/ActiveMQMapMessage.hpp"
#include "activemq/command/ActiveMQObjectMessage.hpp"
#include "activemq/command/ActiveMQStreamMessage.hpp"
#include "activemq/command/ActiveMQTextMessage.hpp"
#include "activemq/command/ActiveMQQueue.hpp"
#include "activemq/command/ConsumerId.hpp"
#include "activemq/command/ProducerId.hpp"
#include "activemq/command/MessageId.hpp"
#include "activemq/command/LocalTransactionId.hpp"
#include "activemq/command/MessageAck.hpp"
#include "activemq/command/MessageDispatch.hpp"
#include "activemq/command/Response.hpp"
#include "activemq/command/ConsumerInfo.hpp"
#include "activemq/command/IntegerResponse.hpp"
#include "activemq/command/ProducerInfo.hpp"
#include "activemq/command/BrokerInfo.hpp"
#include "activemq/command/KeepAliveInfo.hpp"
#include "activemq/command/ConnectionInfo.hpp"
#include "activemq/command/RemoveInfo.hpp"
#include "activemq/command/RemoveSubscriptionInfo.hpp"
#include "activemq/command/SessionInfo.hpp"
#include "activemq/command/TransactionInfo.hpp"
#include "activemq/command/WireFormatInfo.hpp"
#include "activemq/command/BrokerId.hpp"
#include "activemq/command/ShutdownInfo.hpp"
using namespace apache::activemq::command;
/*
*
*/
int AbstractCommand::getCommandId()
{
return commandId ;
}
/*
*
*/
void AbstractCommand::setCommandId(int id)
{
commandId = id ;
}
/*
*
*/
bool AbstractCommand::getResponseRequired()
{
return responseRequired ;
}
/*
*
*/
void AbstractCommand::setResponseRequired(bool value)
{
responseRequired = value ;
}
/*
*
*/
unsigned char AbstractCommand::getDataStructureType()
{
return 0 ;
}
/*
*
*/
bool AbstractCommand::isMarshallAware()
{
return false ;
}
/*
*
*/
int AbstractCommand::marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> writer) throw(IOException)
{
return 0 ;
}
/*
*
*/
void AbstractCommand::unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> reader) throw(IOException)
{
}
/*
*
*/
p<IDataStructure> AbstractCommand::createObject(unsigned char type)
{
switch( type )
{
case ActiveMQMessage::TYPE:
return new ActiveMQMessage() ;
case ActiveMQTextMessage::TYPE:
return new ActiveMQTextMessage() ;
case ActiveMQObjectMessage::TYPE:
return new ActiveMQObjectMessage() ;
case ActiveMQBytesMessage::TYPE:
return new ActiveMQBytesMessage() ;
case ActiveMQStreamMessage::TYPE:
return new ActiveMQStreamMessage() ;
case ActiveMQMapMessage::TYPE:
return new ActiveMQMapMessage() ;
case ActiveMQQueue::TYPE:
return new ActiveMQQueue() ;
case ConsumerId::TYPE:
return new ConsumerId() ;
case ProducerId::TYPE:
return new ProducerId() ;
case MessageId::TYPE:
return new MessageId() ;
case LocalTransactionId::TYPE:
return new LocalTransactionId() ;
case MessageAck::TYPE:
return new MessageAck() ;
case MessageDispatch::TYPE:
return new MessageDispatch() ;
case Response::TYPE:
return new Response() ;
case ConsumerInfo::TYPE:
return new ConsumerInfo() ;
case ProducerInfo::TYPE:
return new ProducerInfo() ;
case TransactionInfo::TYPE:
return new TransactionInfo() ;
case BrokerInfo::TYPE:
return new BrokerInfo() ;
case BrokerId::TYPE:
return new BrokerId() ;
case ConnectionInfo::TYPE:
return new ConnectionInfo() ;
case SessionInfo::TYPE:
return new SessionInfo() ;
case RemoveSubscriptionInfo::TYPE:
return new RemoveSubscriptionInfo() ;
case IntegerResponse::TYPE:
return new IntegerResponse() ;
case WireFormatInfo::TYPE:
return new WireFormatInfo() ;
case RemoveInfo::TYPE:
return new RemoveInfo() ;
case KeepAliveInfo::TYPE:
return new KeepAliveInfo() ;
case ShutdownInfo::TYPE:
return new ShutdownInfo() ;
default:
return NULL ;
}
}
/*
*
*/
p<string> AbstractCommand::getDataStructureTypeAsString(unsigned char type)
{
p<string> packetType = new string() ;
switch( type )
{
case ActiveMQMessage::TYPE:
packetType->assign("ACTIVEMQ_MESSAGE") ;
break ;
case ActiveMQTextMessage::TYPE:
packetType->assign("ACTIVEMQ_TEXT_MESSAGE") ;
break ;
case ActiveMQObjectMessage::TYPE:
packetType->assign("ACTIVEMQ_OBJECT_MESSAGE") ;
break ;
case ActiveMQBytesMessage::TYPE:
packetType->assign("ACTIVEMQ_BYTES_MESSAGE") ;
break ;
case ActiveMQStreamMessage::TYPE:
packetType->assign("ACTIVEMQ_STREAM_MESSAGE") ;
break ;
case ActiveMQMapMessage::TYPE:
packetType->assign("ACTIVEMQ_MAP_MESSAGE") ;
break ;
case ActiveMQQueue::TYPE:
packetType->assign("ACTIVEMQ_QUEUE") ;
break ;
case ConsumerId::TYPE:
packetType->assign("CONSUMER_ID") ;
break ;
case ProducerId::TYPE:
packetType->assign("PRODUCER_ID") ;
break ;
case MessageId::TYPE:
packetType->assign("MESSAGE_ID") ;
break ;
case LocalTransactionId::TYPE:
packetType->assign("LOCAL_TRANSACTION_ID") ;
break ;
case MessageAck::TYPE:
packetType->assign("ACTIVEMQ_MSG_ACK") ;
break ;
case MessageDispatch::TYPE:
packetType->assign("ACTIVEMQ_MSG_DISPATCH") ;
break ;
case Response::TYPE:
packetType->assign("RESPONSE") ;
break ;
case ConsumerInfo::TYPE:
packetType->assign("CONSUMER_INFO") ;
break ;
case ProducerInfo::TYPE:
packetType->assign("PRODUCER_INFO") ;
break;
case TransactionInfo::TYPE:
packetType->assign("TRANSACTION_INFO") ;
break ;
case BrokerInfo::TYPE:
packetType->assign("BROKER_INFO") ;
break ;
case ConnectionInfo::TYPE:
packetType->assign("CONNECTION_INFO") ;
break ;
case SessionInfo::TYPE:
packetType->assign("SESSION_INFO") ;
break ;
case RemoveSubscriptionInfo::TYPE:
packetType->assign("DURABLE_UNSUBSCRIBE") ;
break ;
case IntegerResponse::TYPE:
packetType->assign("INT_RESPONSE_RECEIPT_INFO") ;
break ;
case WireFormatInfo::TYPE:
packetType->assign("WIRE_FORMAT_INFO") ;
break ;
case RemoveInfo::TYPE:
packetType->assign("REMOVE_INFO") ;
break ;
case KeepAliveInfo::TYPE:
packetType->assign("KEEP_ALIVE") ;
break ;
case ShutdownInfo::TYPE:
packetType->assign("SHUTDOWN") ;
break ;
default:
packetType->assign("UNDEFINED") ;
break ;
}
return packetType ;
}

View File

@ -1,56 +1,56 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TestListener.hpp"
/*
*
*/
TestListener::TestListener()
{
}
/*
*
*/
TestListener::~TestListener()
{
}
/*
*
*/
void TestListener::onMessage(p<IMessage> message)
{
try
{
p<IBytesMessage> msg = p_dyncast<IBytesMessage> (message) ;
if( msg == NULL )
cout << "No message received!" << endl ;
else
{
cout << "Received message with ID: " << msg->getJMSMessageID()->c_str() << endl ;
cout << " boolean: " << (msg->readBoolean() ? "true" : "false") << endl ;
cout << " integer: " << msg->readInt() << endl ;
cout << " string: " << msg->readUTF()->c_str() << endl ;
}
}
catch( exception& e )
{
cout << "OnMessage caught: " << e.what() << endl ;
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TestListener.hpp"
/*
*
*/
TestListener::TestListener()
{
}
/*
*
*/
TestListener::~TestListener()
{
}
/*
*
*/
void TestListener::onMessage(p<IMessage> message)
{
try
{
p<IBytesMessage> msg = p_dyncast<IBytesMessage> (message) ;
if( msg == NULL )
cout << "No message received!" << endl ;
else
{
cout << "Received message with ID: " << msg->getJMSMessageID()->c_str() << endl ;
cout << " boolean: " << (msg->readBoolean() ? "true" : "false") << endl ;
cout << " integer: " << msg->readInt() << endl ;
cout << " string: " << msg->readUTF()->c_str() << endl ;
}
}
catch( exception& e )
{
cout << "OnMessage caught: " << e.what() << endl ;
}
}

View File

@ -1,44 +1,44 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef TestListener_hpp_
#define TestListener_hpp_
#include <iostream>
#include <string>
#include "cms/IMessage.hpp"
#include "cms/IBytesMessage.hpp"
#include "cms/IMessageListener.hpp"
#include "ppr/util/ifr/p"
using namespace apache::cms;
using namespace ifr;
using namespace std;
/*
*
*/
class TestListener : public IMessageListener
{
public:
TestListener() ;
virtual ~TestListener() ;
virtual void onMessage(p<IMessage> message) ;
} ;
#endif /*TestListener_hpp_*/
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef TestListener_hpp_
#define TestListener_hpp_
#include <iostream>
#include <string>
#include "cms/IMessage.hpp"
#include "cms/IBytesMessage.hpp"
#include "cms/IMessageListener.hpp"
#include "ppr/util/ifr/p"
using namespace apache::cms;
using namespace ifr;
using namespace std;
/*
*
*/
class TestListener : public IMessageListener
{
public:
TestListener() ;
virtual ~TestListener() ;
virtual void onMessage(p<IMessage> message) ;
} ;
#endif /*TestListener_hpp_*/

View File

@ -1,192 +1,192 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include "cms/IConnectionFactory.hpp"
#include "cms/IConnection.hpp"
#include "cms/IDestination.hpp"
#include "cms/IMessageConsumer.hpp"
#include "cms/IMessageProducer.hpp"
#include "cms/ISession.hpp"
#include "cms/ITextMessage.hpp"
#include "activemq/ConnectionFactory.hpp"
#include "activemq/Connection.hpp"
#include "activemq/command/ActiveMQTextMessage.hpp"
#include "ppr/TraceException.hpp"
#include "ppr/util/MapItemHolder.hpp"
#include "ppr/net/Uri.hpp"
#include "ppr/util/ifr/p"
#include "TestListener.hpp"
using namespace apache::activemq;
using namespace apache::activemq::command;
using namespace apache::cms;
using namespace apache::ppr;
using namespace apache::ppr::net;
using namespace apache::ppr::util;
using namespace ifr;
using namespace std;
/*
* Tests synchronous sending/receiving of a text message
*/
void testSyncTextMessage()
{
try
{
p<IConnectionFactory> factory ;
p<IConnection> connection ;
p<ISession> session ;
p<IQueue> queue ;
p<IMessageConsumer> consumer ;
p<IMessageProducer> producer ;
p<ITextMessage> reqMessage,
rspMessage ;
p<Uri> uri ;
p<PropertyMap> props ;
cout << "Connecting to ActiveMQ broker..." << endl ;
uri = new Uri("tcp://127.0.0.1:61616?trace=true&protocol=openwire") ;
factory = new ConnectionFactory(uri) ;
connection = factory->createConnection() ;
// Create session
session = connection->createSession() ;
// Connect to queue
queue = session->getQueue("FOO.BAR") ;
cout << "Using destination: " << queue->getQueueName()->c_str() << endl ;
// Create a consumer and producer
consumer = session->createConsumer(queue) ;
producer = session->createProducer(queue) ;
producer->setPersistent(true) ;
// Create a message
reqMessage = session->createTextMessage("Hello World!") ;
reqMessage->setJMSCorrelationID("abc") ;
reqMessage->setJMSXGroupID("cheese") ;
props = reqMessage->getProperties() ;
(*props)["someHeader"] = MapItemHolder( "James" ) ;
// Send message
producer->send(reqMessage) ;
cout << "Waiting for asynchrounous receive message..." << endl ;
// Receive and wait for a message
rspMessage = p_dyncast<ActiveMQTextMessage> (consumer->receive()) ;
if( rspMessage == NULL )
cout << "No message received!" << endl ;
else
{
cout << "Received message with ID: " << rspMessage->getJMSMessageID()->c_str() << endl ;
cout << " and text: " << rspMessage->getText()->c_str() << endl ;
}
// Shutdown gracefully (including all attached sub-items, sessions, consumer/producer)
connection->close() ;
cout << "Disconnected from ActiveMQ broker" << endl ;
}
catch( TraceException& te )
{
cout << "Caught: " << te.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
catch( exception& e )
{
cout << "Caught: " << e.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
}
/*
* Tests asynchronous sending/receiving of a binary message
*/
void testAsyncByteMessage()
{
try
{
p<IConnectionFactory> factory ;
p<IConnection> connection ;
p<ISession> session ;
p<IQueue> queue ;
p<IMessageConsumer> consumer ;
p<IMessageProducer> producer ;
p<IBytesMessage> reqMessage,
rspMessage ;
p<Uri> uri ;
p<PropertyMap> props ;
p<TestListener> listener ;
cout << "Connecting to ActiveMQ broker..." << endl ;
uri = new Uri("tcp://127.0.0.1:61616?trace=true&protocol=openwire") ;
factory = new ConnectionFactory(uri) ;
connection = factory->createConnection() ;
// Create session
session = connection->createSession() ;
// Connect to queue
queue = session->getQueue("FOO.BAR") ;
cout << "Using destination: " << queue->getQueueName()->c_str() << endl ;
// Create producer and a asycnhrounous consumer
producer = session->createProducer(queue) ;
producer->setPersistent(true) ;
consumer = session->createConsumer(queue) ;
listener = new TestListener() ;
consumer->setMessageListener(listener) ;
// Create binary message
reqMessage = session->createBytesMessage() ;
reqMessage->writeBoolean(true) ;
reqMessage->writeInt(3677490) ;
reqMessage->writeUTF("Hello Binary World!") ;
// Send message
producer->send(reqMessage) ;
// Wait for asynchronous message
char c = getchar() ;
// Shutdown gracefully (including all attached sub-items, sessions, consumer/producer)
connection->close() ;
cout << "Disconnected from ActiveMQ broker" << endl ;
}
catch( TraceException& te )
{
cout << "Caught: " << te.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
catch( exception& e )
{
cout << "Caught: " << e.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
}
int main()
{
testSyncTextMessage() ;
testAsyncByteMessage() ;
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include "cms/IConnectionFactory.hpp"
#include "cms/IConnection.hpp"
#include "cms/IDestination.hpp"
#include "cms/IMessageConsumer.hpp"
#include "cms/IMessageProducer.hpp"
#include "cms/ISession.hpp"
#include "cms/ITextMessage.hpp"
#include "activemq/ConnectionFactory.hpp"
#include "activemq/Connection.hpp"
#include "activemq/command/ActiveMQTextMessage.hpp"
#include "ppr/TraceException.hpp"
#include "ppr/util/MapItemHolder.hpp"
#include "ppr/net/Uri.hpp"
#include "ppr/util/ifr/p"
#include "TestListener.hpp"
using namespace apache::activemq;
using namespace apache::activemq::command;
using namespace apache::cms;
using namespace apache::ppr;
using namespace apache::ppr::net;
using namespace apache::ppr::util;
using namespace ifr;
using namespace std;
/*
* Tests synchronous sending/receiving of a text message
*/
void testSyncTextMessage()
{
try
{
p<IConnectionFactory> factory ;
p<IConnection> connection ;
p<ISession> session ;
p<IQueue> queue ;
p<IMessageConsumer> consumer ;
p<IMessageProducer> producer ;
p<ITextMessage> reqMessage,
rspMessage ;
p<Uri> uri ;
p<PropertyMap> props ;
cout << "Connecting to ActiveMQ broker..." << endl ;
uri = new Uri("tcp://127.0.0.1:61616?trace=true&protocol=openwire") ;
factory = new ConnectionFactory(uri) ;
connection = factory->createConnection() ;
// Create session
session = connection->createSession() ;
// Connect to queue
queue = session->getQueue("FOO.BAR") ;
cout << "Using destination: " << queue->getQueueName()->c_str() << endl ;
// Create a consumer and producer
consumer = session->createConsumer(queue) ;
producer = session->createProducer(queue) ;
producer->setPersistent(true) ;
// Create a message
reqMessage = session->createTextMessage("Hello World!") ;
reqMessage->setJMSCorrelationID("abc") ;
reqMessage->setJMSXGroupID("cheese") ;
props = reqMessage->getProperties() ;
(*props)["someHeader"] = MapItemHolder( "James" ) ;
// Send message
producer->send(reqMessage) ;
cout << "Waiting for asynchrounous receive message..." << endl ;
// Receive and wait for a message
rspMessage = p_dyncast<ActiveMQTextMessage> (consumer->receive()) ;
if( rspMessage == NULL )
cout << "No message received!" << endl ;
else
{
cout << "Received message with ID: " << rspMessage->getJMSMessageID()->c_str() << endl ;
cout << " and text: " << rspMessage->getText()->c_str() << endl ;
}
// Shutdown gracefully (including all attached sub-items, sessions, consumer/producer)
connection->close() ;
cout << "Disconnected from ActiveMQ broker" << endl ;
}
catch( TraceException& te )
{
cout << "Caught: " << te.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
catch( exception& e )
{
cout << "Caught: " << e.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
}
/*
* Tests asynchronous sending/receiving of a binary message
*/
void testAsyncByteMessage()
{
try
{
p<IConnectionFactory> factory ;
p<IConnection> connection ;
p<ISession> session ;
p<IQueue> queue ;
p<IMessageConsumer> consumer ;
p<IMessageProducer> producer ;
p<IBytesMessage> reqMessage,
rspMessage ;
p<Uri> uri ;
p<PropertyMap> props ;
p<TestListener> listener ;
cout << "Connecting to ActiveMQ broker..." << endl ;
uri = new Uri("tcp://127.0.0.1:61616?trace=true&protocol=openwire") ;
factory = new ConnectionFactory(uri) ;
connection = factory->createConnection() ;
// Create session
session = connection->createSession() ;
// Connect to queue
queue = session->getQueue("FOO.BAR") ;
cout << "Using destination: " << queue->getQueueName()->c_str() << endl ;
// Create producer and a asycnhrounous consumer
producer = session->createProducer(queue) ;
producer->setPersistent(true) ;
consumer = session->createConsumer(queue) ;
listener = new TestListener() ;
consumer->setMessageListener(listener) ;
// Create binary message
reqMessage = session->createBytesMessage() ;
reqMessage->writeBoolean(true) ;
reqMessage->writeInt(3677490) ;
reqMessage->writeUTF("Hello Binary World!") ;
// Send message
producer->send(reqMessage) ;
// Wait for asynchronous message
char c = getchar() ;
// Shutdown gracefully (including all attached sub-items, sessions, consumer/producer)
connection->close() ;
cout << "Disconnected from ActiveMQ broker" << endl ;
}
catch( TraceException& te )
{
cout << "Caught: " << te.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
catch( exception& e )
{
cout << "Caught: " << e.what() << endl ;
//cout << "Stack: " << e.getStackTrace() ;
}
}
int main()
{
testSyncTextMessage() ;
testAsyncByteMessage() ;
}