/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include #include #include "RCSID.h" #include #include #include #include #include #include #include "TextMessage.h" #include "BytesMessage.h" #include "PrimitiveMap.h" #include "MessageConsumer.h" #include "NonBlockingMessageConsumer.h" #include "BlockingMessageConsumer.h" #include "NonBlockingMessageConsumerRef.h" #include "BlockingMessageConsumerRef.h" #include "Exception.h" #include "Destination.h" #include "CoreLibImpl_.h" #include "StompMessage.h" #include "NullLogger.h" #include "command/CommandTypes.h" #include "command/ConnectionInfo.h" #include "command/SessionInfo.h" #include "command/ConsumerInfo.h" #include "command/WireFormatInfo.h" #include "command/ProducerInfo.h" #include "command/ShutdownInfo.h" #include "command/ActiveMQTopic.h" #include "command/ActiveMQTempTopic.h" #include "command/ActiveMQTempQueue.h" #include "command/ActiveMQQueue.h" #include "command/ActiveMQTextMessage.h" #include "command/ActiveMQBytesMessage.h" #include "command/ActiveMQDestination.h" #include "command/MessageDispatch.h" #include "command/Response.h" #include "command/BrokerInfo.h" #include "command/RemoveInfo.h" #include "command/DestinationInfo.h" #include "command/MessageAck.h" #include "marshal/BaseDataStreamMarshaller.h" #include "marshal/ProtocolFormat.h" #include "marshal/MarshallerFactory.h" #include "marshal/BufferWriter.h" #include "marshal/BufferReader.h" using namespace ActiveMQ; using namespace std; using boost::shared_ptr; RCSID(CoreLibImpl, "$Id$"); #define FATAL(msg) \ do { if (logger_.get() && logger_->isEnabled(LogLevel::Fatal)) \ logger_->logFatal(msg); } while(0) #define ERROR(msg) \ do { if (logger_.get() && logger_->isEnabled(LogLevel::Error)) \ logger_->logError(msg); } while(0) #define WARNING(msg) \ do { if (logger_.get() && logger_->isEnabled(LogLevel::Warning)) \ logger_->logWarning(msg); } while(0) #define INFORM(msg) \ do { if (logger_.get() && logger_->isEnabled(LogLevel::Inform)) \ logger_->logInform(msg); } while(0) #define DEBUG(msg) \ do { if (logger_.get() && logger_->isEnabled(LogLevel::Debug)) \ logger_->logDebug(msg); } while(0) const static int OPENWIRE_VERSION = 1; const static int PREFETCH_SIZE = 32766; const static int SESSION_ID = 12345; const static int STANDARD_ACK = 2; const static int ADD_DEST = 0; const static int REMOVE_DEST = 1; const static UUIDGenerator uuidgen; const static string activemq = "ActiveMQ"; CoreLibImpl::CoreLibImpl(CoreLib *parent, const string& user, const string& password) : parent_(parent), user_(user), password_(password), sizeBufPos_(0), inMessage_(false), yetToRecv_(0), sessionId_(SESSION_ID), nextCommandId_(0), nextProducerId_(0), nextConsumerId_(0), nextTempDestId_(0), logger_(new NullLogger()), initialized_(false) { Marshalling::MarshallerFactory::configure(pf_); clientId_ = uuidgen.getGuid(); connectionId_ = uuidgen.getGuid(); } void CoreLibImpl::marshalPending_(Buffer& b) { if (!(pendingBuffer_.empty())) { b.insert(b.end(), pendingBuffer_.begin(), pendingBuffer_.end()); pendingBuffer_.resize(0); } } void CoreLibImpl::marshalCommand_(Command::BaseCommand& o, Buffer& b) { o.setCommandId(nextCommandId_++); marshalCommand_(static_cast(o), b); } void loosemarshalcommand(Command::WireFormatInfo& wfi, Buffer& b) { size_t length = htonl(100); int openwire_version = htonl(OPENWIRE_VERSION); // size //b.insert(b.end(), (uint8_t *)(&length), (uint8_t *)(&length) + 4); // type b.push_back(1); // magic b.push_back('A'); b.push_back('c'); b.push_back('t'); b.push_back('i'); b.push_back('v'); b.push_back('e'); b.push_back('M'); b.push_back('Q'); // openwire version b.insert(b.end(), (uint8_t *)(&openwire_version), (uint8_t *)(&openwire_version) + 4); // marshalled properties // true it's there b.push_back(1); size_t mplen = htonl(wfi.getMarshalledProperties().size()); // length b.insert(b.end(), (uint8_t *)(&mplen), (uint8_t *)(&mplen) + 4); // data b.insert(b.end(), wfi.getMarshalledProperties().begin(), wfi.getMarshalledProperties().end()); length = htonl(1 + 8 + 4 + 1 + 4 + ntohl(mplen)); b.insert(b.begin(), (uint8_t *)(&length), (uint8_t *)(&length) + 4); } void CoreLibImpl::marshalCommand_(Command::AbstractCommand& o, Buffer& b) { IO::BooleanStream bs; // get the marshaller Marshalling::BaseDataStreamMarshaller* marshaller = pf_.getMarshaller(o.getCommandType()); if (marshaller == NULL) { stringstream exmsg("Invalid command type ("); exmsg << o.getCommandType(); exmsg << ") passed to marshalCommand"; throw Exception(exmsg.str()); } // first pass gets the size and writes the flags bitset uint32_t size = marshaller->marshal1(pf_, o, bs); size = htonl(size + bs.getMarshalledSize()); // first 4 bytes of a message are its size b.insert(b.end(), (uint8_t *)(&size), (uint8_t *)(&size) + 4); // then one byte that is the command type b.push_back(o.getCommandType()); IO::BufferWriter bw(b); // then the bitset bs.marshal(bw); // then the second marshalling pass which actually writes the data out marshaller->marshal2(pf_, o, bw, bs); } void CoreLibImpl::unmarshalCommand_(Command::AbstractCommand& o, const Buffer& b) { IO::BufferReader br(b); IO::BooleanStream bs(br); Marshalling::BaseDataStreamMarshaller *marshaller = pf_.getMarshaller(o.getCommandType()); if (marshaller == NULL) { stringstream exmsg("Invalid command type ("); exmsg << o.getCommandType(); exmsg << ") received from broker"; throw Exception(exmsg.str()); } marshaller->unmarshal(pf_, o, br, bs); } void CoreLibImpl::initialize(Buffer& b) { // The first step is wire format negotiation - though it's not really - // see https://issues.apache.org/activemq/browse/AMQ-681 // In any case, the first packet sent is a WireFormatInfo packet Command::WireFormatInfo wfi; // It has a magic string prefix vector magic(activemq.begin(), activemq.end()); wfi.setMagic(magic); // OpenWire version wfi.setVersion(OPENWIRE_VERSION); // We want "tight" encoding, which is length-prefixed PrimitiveMap optionMap; optionMap.putBoolean("TightEncodingEnabled", true); Buffer marshalledOptions; optionMap.marshal(marshalledOptions); wfi.setMarshalledProperties(marshalledOptions); // marshalCommand_(wfi, b); loosemarshalcommand(wfi, b); // The next command sets up our JMS Connection Command::ConnectionInfo cinfo; cinfo.setUserName(user_); cinfo.setPassword(password_); cinfo.setClientId(clientId_); shared_ptr cid(new Command::ConnectionId()); cid->setValue(connectionId_); cinfo.setConnectionId(cid); marshalCommand_(cinfo, b); // Now we set up the JMS Session Command::SessionInfo sinfo; shared_ptr sid(new Command::SessionId()); sid->setConnectionId(connectionId_); sid->setValue(sessionId_); sinfo.setSessionId(sid); marshalCommand_(sinfo, b); initialized_ = true; } void CoreLibImpl::publish(const Message& m, Buffer& b) { publish(m.getDestination(), m, b); } void CoreLibImpl::publish(const Destination& d, const Message& m, Buffer &b) { // Offload any pending data now that we have the opportunity marshalPending_(b); shared_ptr dest(d.createCommandInstance().release()); // Create our producer if we haven't already if (producerId_.get() == NULL) { Command::ProducerInfo pi; shared_ptr pid(new Command::ProducerId()); pid->setConnectionId(connectionId_); pid->setSessionId(sessionId_); pid->setValue(nextProducerId_++); producerId_ = pid; pi.setProducerId(producerId_); marshalCommand_(pi, b); } auto_ptr message; if (m.getType() == Command::Types::ACTIVEMQ_TEXT_MESSAGE) message.reset(new Command::ActiveMQTextMessage()); else if (m.getType() == Command::Types::ACTIVEMQ_BYTES_MESSAGE) message.reset(new Command::ActiveMQBytesMessage()); message->setProducerId(producerId_); message->setDestination(dest); message->setPersistent(false); if (!(m.getReplyTo().getName().empty())) { shared_ptr replyTo(m.getReplyTo().createCommandInstance().release()); message->setReplyTo(replyTo); } shared_ptr mid(new Command::MessageId()); mid->setProducerId(producerId_); message->setMessageId(mid); Buffer content; // serialize the message into the buffer m.marshall(content); message->setContent(content); marshalCommand_(*message, b); } void CoreLibImpl::disconnect(Buffer& b) { marshalPending_(b); // remove the sessionid Command::RemoveInfo r; shared_ptr sidptr(new Command::SessionId()); sidptr->setValue(sessionId_); sidptr->setConnectionId(connectionId_); r.setObjectId(sidptr); marshalCommand_(r, b); // remove the connectionid Command::RemoveInfo r2; shared_ptr cidptr(new Command::ConnectionId()); cidptr->setValue(connectionId_); r2.setObjectId(cidptr); marshalCommand_(r2, b); // send shutdowninfo Command::ShutdownInfo si; marshalCommand_(si, b); initialized_ = false; } CoreLibImpl::~CoreLibImpl() { for (list::iterator i = consumerRefs_.begin(); i != consumerRefs_.end(); ++i) { (*i)->invalidate(); deregisterRef(*i); } for (list::iterator i = allRegisteredDestinations_.begin(); i != allRegisteredDestinations_.end(); ++i) { const_cast(*i)->invalidate(); } } void CoreLibImpl::subscribe(const Destination& d, MessageConsumerRef& q, Buffer& b) { marshalPending_(b); MessageConsumer *mcptr = q.getConsumer(); destinationMaps_.insert(pair(d, mcptr)); if (consumerIds_.count(d) == 0) { // make a new consumer id if there isn't one for this destination shared_ptr cid(new Command::ConsumerId()); cid->setConnectionId(connectionId_); cid->setSessionId(sessionId_); cid->setValue(nextConsumerId_++); consumerIds_[d] = cid; } Command::ConsumerInfo si; shared_ptr dest(d.createCommandInstance().release()); si.setConsumerId(consumerIds_[d]); si.setDestination(dest); si.setPrefetchSize(PREFETCH_SIZE); si.setDispatchAsync(true); marshalCommand_(si, b); } void CoreLibImpl::unsubscribe(const Destination& d, Buffer& b) { marshalPending_(b); map::iterator i = destinationMaps_.find(d); if (i == destinationMaps_.end()) throw Exception("Not subscribed to destination " + d.getName()); i->second->removeQueued(d); if (consumerIds_.count(d) != 0) { // Remove the consumer for this destination map >::iterator di = consumerIds_.find(d); shared_ptr ourcid(di->second); Command::RemoveInfo ri; ri.setObjectId(ourcid); marshalCommand_(ri, b); consumerIds_.erase(di); } } void CoreLibImpl::handleData(const Buffer& incoming, Buffer& b) { handleData(&(incoming.operator[](0)), incoming.size(), b); } void CoreLibImpl::unmarshalBuffer(vector& buf, Buffer& b) { int type = buf[0]; buf.erase(buf.begin()); switch (type) { case Command::Types::BROKER_INFO: { // BrokerInfo // Ignore it, nothing to really do with it } break; case Command::Types::SHUTDOWN_INFO: { // ShutdownInfo, broker is exiting Command::ShutdownInfo s; unmarshalCommand_(s, buf); INFORM("Got ShutdownInfo, shutting down"); disconnect(b); } break; case Command::Types::MESSAGE_DISPATCH: { // MessageDispatch, new message arriving Command::MessageDispatch md; unmarshalCommand_(md, buf); shared_ptr m = md.getMessage(); if (m.get() != NULL) { shared_ptr dest = m->getDestination(); if (dest.get() != NULL) { // Ack the message Command::MessageAck ack; auto_ptr destToAck; Destination myd; switch(dest->getCommandType()) { case Command::Types::ACTIVEMQ_TOPIC: destToAck.reset(new Command::ActiveMQTopic()); myd = Destination(parent_, dest->getPhysicalName(), false /* isTemp */, true /* isTopic */); break; case Command::Types::ACTIVEMQ_QUEUE: destToAck.reset(new Command::ActiveMQQueue()); myd = Destination(parent_, dest->getPhysicalName(), false /* isTemp */, false /* isTopic */); break; case Command::Types::ACTIVEMQ_TEMP_TOPIC: destToAck.reset(new Command::ActiveMQTempTopic()); myd = Destination(parent_, dest->getPhysicalName(), true /* isTemp */, true /* isTopic */); break; case Command::Types::ACTIVEMQ_TEMP_QUEUE: destToAck.reset(new Command::ActiveMQTempQueue()); myd = Destination(parent_, dest->getPhysicalName(), true /* isTemp */, false /* isTopic */); break; }; destToAck->setPhysicalName(dest->getPhysicalName()); shared_ptr cidToAck(consumerIds_[myd]); ack.setConsumerId(cidToAck); ack.setDestination(shared_ptr(destToAck)); ack.setAckType(STANDARD_ACK); // tells broker to discard the message ack.setMessageCount(1); marshalCommand_(ack, b); const string& ourdestname(dest->getPhysicalName()); map::iterator i = destinationMaps_.find(myd); if (i == destinationMaps_.end()) WARNING("No MessageConsumer registered for received message on destination " + ourdestname); else { int mtype = m->getCommandType(); auto_ptr toenqueue; switch (mtype) { case Command::Types::ACTIVEMQ_TEXT_MESSAGE: toenqueue.reset(new TextMessage(m->getContent())); break; case Command::Types::ACTIVEMQ_BYTES_MESSAGE: toenqueue.reset(new BytesMessage(m->getContent())); break; default: WARNING("Unknown/unimplemented message command type"); return; }; toenqueue->setDestination(myd); if (m->getReplyTo() != NULL) { const bool rtoIsTemp = m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_QUEUE || m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_TOPIC; const bool rtoIsTopic = m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_TOPIC || m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TOPIC; Destination rto(parent_, m->getReplyTo()->getPhysicalName(), rtoIsTemp, rtoIsTopic); toenqueue->setReplyTo(rto); } i->second->enqueue(toenqueue.release()); } } } } break; } } void CoreLibImpl::handleData(const uint8_t *buf, size_t len, Buffer& b) { if (!inMessage_) { uint32_t msgsize = 0; if (sizeBufPos_ == 0) { if (len >= 4) { msgsize = ntohl(*(uint32_t*)(buf)); buf += 4; len -= 4; } else { memcpy(sizeBuf_, buf, len); sizeBufPos_ += len; return; } } else { if (len + sizeBufPos_ >= 4) { memcpy(sizeBuf_ + sizeBufPos_, buf, 4 - sizeBufPos_); msgsize = ntohl(*(uint32_t*)(sizeBuf_)); buf += (4 - sizeBufPos_); len -= (4 - sizeBufPos_); } else { memcpy(sizeBuf_ + sizeBufPos_, buf, len); sizeBufPos_ += len; return; } } if (msgsize == 0) throw Exception("Zero-length message - corrupt data stream"); unmarshalBuffer_.resize(0); unmarshalBuffer_.reserve(msgsize); yetToRecv_ = msgsize; inMessage_ = true; sizeBufPos_ = 0; } // does this incoming message fill or exceed the buffer? if (len == yetToRecv_) { // exact match inMessage_ = false; unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + len); unmarshalBuffer(unmarshalBuffer_, b); } else if (len < yetToRecv_) { // not yet unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + len); yetToRecv_ -= len; } else if (len > yetToRecv_) { // too much inMessage_ = false; size_t excess = len - yetToRecv_; // number of excess bytes past the current message size_t thismsg = len - excess; // number of bytes of this buffer that are the current message unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + thismsg); unmarshalBuffer(unmarshalBuffer_, b); handleData(buf + thismsg, excess, b); } } BlockingMessageConsumerRef CoreLibImpl::newBlockingMessageConsumer(void) { BlockingMessageConsumer *q = new BlockingMessageConsumer(); consumers_.push_back(q); refCounts_[q] = 0; return BlockingMessageConsumerRef(parent_, q); } NonBlockingMessageConsumerRef CoreLibImpl::newNonBlockingMessageConsumer(void) { NonBlockingMessageConsumer *q = new NonBlockingMessageConsumer(); consumers_.push_back(q); refCounts_[q] = 0; return NonBlockingMessageConsumerRef(parent_, q); } void CoreLibImpl::registerRef(MessageConsumerRef *q) { if (NULL == q) return; MessageConsumer *mq = q->getConsumer(); consumerRefs_.push_back(q); ++refCounts_[mq]; } void CoreLibImpl::deregisterRef(MessageConsumerRef *q) { if (NULL == q) return; MessageConsumer *mq = q->getConsumer(); int count = --refCounts_[mq]; if (count == 0) { refCounts_.erase(mq); // delete all references to the consumer for(map::iterator i = destinationMaps_.begin(); i != destinationMaps_.end();) { if (i->second == mq) destinationMaps_.erase(i++); else ++i; } consumers_.remove(mq); // delete the consumer delete mq; } consumerRefs_.remove(q); } void CoreLibImpl::setLogger(auto_ptr lgr) { logger_ = lgr; } Logger& CoreLibImpl::getLogger() { return *logger_; } void CoreLibImpl::registerDest(const Destination& d) { map::iterator i = destRefCounts_.find(d.toString()); if (i == destRefCounts_.end()) destRefCounts_.insert(pair(d.toString(), 1)); else i->second++; allRegisteredDestinations_.push_back(&d); } void CoreLibImpl::unregisterDest(const Destination& d) { map::iterator i = destRefCounts_.find(d.toString()); if (i == destRefCounts_.end()) return; allRegisteredDestinations_.remove(&d); if (i->second == 1) { // This is the last reference to this destination map >::iterator cii = consumerIds_.find(d); // delete the relevant consumer, if there is one if (cii != consumerIds_.end()) { shared_ptr ci(cii->second); Command::RemoveInfo r; r.setObjectId(ci); marshalCommand_(r, pendingBuffer_); consumerIds_.erase(cii); } // delete the destination, if it's our responsibility // we know we created a temporary destination if our connection ID is in it if (d.isTemporary() && d.getName().find(connectionId_) != string::npos) { Command::DestinationInfo di; shared_ptr cid(new Command::ConnectionId()); cid->setValue(connectionId_); shared_ptr dest(d.createCommandInstance().release()); di.setDestination(dest); di.setOperationType(REMOVE_DEST); di.setConnectionId(cid); marshalCommand_(di, pendingBuffer_); } destRefCounts_.erase(i); } else i->second--; } Destination CoreLibImpl::createTemporaryTopic() { stringstream str; str << connectionId_ << ":" << nextTempDestId_++; Destination ret(parent_, str.str(), true /* isTemp */, true /* isTopic */); Command::DestinationInfo di; shared_ptr cid(new Command::ConnectionId()); cid->setValue(connectionId_); shared_ptr dest(new Command::ActiveMQTempTopic()); dest->setPhysicalName(ret.getName()); di.setDestination(dest); di.setOperationType(ADD_DEST); di.setConnectionId(cid); marshalCommand_(di, pendingBuffer_); return ret; } Destination CoreLibImpl::createTemporaryQueue() { stringstream str; str << connectionId_ << ":" << nextTempDestId_++; Destination ret(parent_, str.str(), true /* isTemp */, false /* isTopic */); Command::DestinationInfo di; shared_ptr cid(new Command::ConnectionId()); cid->setValue(connectionId_); shared_ptr dest(new Command::ActiveMQTempQueue()); dest->setPhysicalName(ret.getName()); di.setDestination(dest); di.setOperationType(ADD_DEST); di.setConnectionId(cid); marshalCommand_(di, pendingBuffer_); return ret; } Destination CoreLibImpl::createTopic(const std::string& name) { return Destination(parent_, name, false /* isTemp */, true /* isTopic */); } Destination CoreLibImpl::createQueue(const std::string& name) { return Destination(parent_, name, false /* isTemp */, false /* isTopic */); }