mirror of https://github.com/apache/activemq.git
Fixed handling of bytes message for Jira issue AMQ-685
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@396330 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c47de061b7
commit
c1b8e0410d
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
|
||||
#include <activemq/ActiveMQTextMessage.h>
|
||||
#include <activemq/ActiveMQBytesMessage.h>
|
||||
#include <cms/TopicConnectionFactory.h>
|
||||
#include <cms/TopicConnection.h>
|
||||
#include <cms/TopicSession.h>
|
||||
|
@ -46,13 +47,13 @@ public:
|
|||
|
||||
try{
|
||||
|
||||
int numMessages = 1000;
|
||||
int sleepTime = 10;
|
||||
int messagesPerType = 3000;
|
||||
int sleepTime = 2;
|
||||
|
||||
printf("Starting activemqcms test (sending %d messages and sleeping %d seconds) ...\n", numMessages, sleepTime );
|
||||
printf("Starting activemqcms test (sending %d messages per type and sleeping %d seconds) ...\n", messagesPerType, sleepTime );
|
||||
|
||||
// START SNIPPET: demo
|
||||
cms::TopicConnectionFactory* connectionFactory = new activemq::ActiveMQConnectionFactory( "127.0.0.1:61626" );
|
||||
cms::TopicConnectionFactory* connectionFactory = new activemq::ActiveMQConnectionFactory( "127.0.0.1:61613" );
|
||||
cms::TopicConnection* connection = connectionFactory->createTopicConnection();
|
||||
connection->setExceptionListener( this );
|
||||
connection->start();
|
||||
|
@ -62,22 +63,46 @@ public:
|
|||
subscriber->setMessageListener( this );
|
||||
cms::TopicPublisher* publisher = session->createPublisher( topic );
|
||||
|
||||
// Send some text messages.
|
||||
const char* text = "this is a test!";
|
||||
cms::TextMessage* msg = session->createTextMessage( text );
|
||||
|
||||
for( int ix=0; ix<numMessages; ++ix ){
|
||||
publisher->publish( msg );
|
||||
doSleep();
|
||||
cms::TextMessage* textMsg = session->createTextMessage( text );
|
||||
for( int ix=0; ix<messagesPerType; ++ix ){
|
||||
publisher->publish( textMsg );
|
||||
doSleep();
|
||||
}
|
||||
|
||||
// Send some bytes messages.
|
||||
char buf[10];
|
||||
memset( buf, 0, 10 );
|
||||
buf[0] = 0;
|
||||
buf[1] = 1;
|
||||
buf[2] = 2;
|
||||
buf[3] = 3;
|
||||
buf[4] = 0;
|
||||
buf[5] = 4;
|
||||
buf[6] = 5;
|
||||
buf[7] = 6;
|
||||
cms::BytesMessage* bytesMsg = session->createBytesMessage();
|
||||
bytesMsg->setData( buf, 10 );
|
||||
for( int ix=0; ix<messagesPerType; ++ix ){
|
||||
publisher->publish( bytesMsg );
|
||||
doSleep();
|
||||
}
|
||||
// END SNIPPET: demo
|
||||
|
||||
sleep( sleepTime );
|
||||
|
||||
printf("received: %d\n", numReceived );
|
||||
|
||||
delete publisher;
|
||||
sleep( 5 );
|
||||
|
||||
printf("unsubscribing\n" );
|
||||
delete publisher;
|
||||
subscriber->close();
|
||||
delete subscriber;
|
||||
|
||||
sleep( 5 );
|
||||
|
||||
session->close();
|
||||
delete session;
|
||||
connection->close();
|
||||
|
@ -90,14 +115,26 @@ public:
|
|||
}
|
||||
|
||||
virtual void onMessage( const cms::Message* message ){
|
||||
|
||||
// Got a text message.
|
||||
const cms::TextMessage* txtMsg = dynamic_cast<const cms::TextMessage*>(message);
|
||||
if( txtMsg == NULL ){
|
||||
printf("received non-text message\n" );
|
||||
return;
|
||||
if( txtMsg != NULL ){
|
||||
//printf("received text msg: %s\n", txtMsg->getText() );
|
||||
}
|
||||
|
||||
// Got a bytes msg.
|
||||
const cms::BytesMessage* bytesMsg = dynamic_cast<const cms::BytesMessage*>(message);
|
||||
if( bytesMsg != NULL ){
|
||||
/*printf("received bytes msg: " );
|
||||
const char* bytes = bytesMsg->getData();
|
||||
int numBytes = bytesMsg->getNumBytes();
|
||||
for( int ix=0; ix<numBytes; ++ix ){
|
||||
printf("[%d]", bytes[ix] );
|
||||
}
|
||||
printf("\n");*/
|
||||
}
|
||||
|
||||
numReceived++;
|
||||
//printf( "[%d]: %s\n", ++ix, txtMsg->getMessage() );
|
||||
}
|
||||
|
||||
virtual void onException( const cms::CMSException* error ){
|
||||
|
|
Loading…
Reference in New Issue