r241@34: chirino | 2007-02-23 14:49:05 -0500

Fixed some Stream tests that broke a little with the latest changes..
 Added better network flow control.
 


git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@511085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-02-23 20:24:30 +00:00
parent efebbf70b8
commit 6916299290
10 changed files with 126 additions and 132 deletions

View File

@ -50,8 +50,6 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
// These are the messages waiting to be delivered to the client // These are the messages waiting to be delivered to the client
private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
private int deliveredCounter = 0;
private MessageDispatch lastDelivered;
private boolean eosReached; private boolean eosReached;
private byte buffer[]; private byte buffer[];
private int pos; private int pos;
@ -117,10 +115,6 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
public void close() throws IOException { public void close() throws IOException {
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
try { try {
if (lastDelivered != null) {
MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
connection.asyncSendPacket(ack);
}
dispose(); dispose();
this.connection.syncSendPacket(info.createRemoveCommand()); this.connection.syncSendPacket(info.createRemoveCommand());
} catch (JMSException e) { } catch (JMSException e) {
@ -150,16 +144,8 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired())
return null; return null;
deliveredCounter++; MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) { connection.asyncSendPacket(ack);
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
connection.asyncSendPacket(ack);
deliveredCounter = 0;
lastDelivered = null;
} else {
lastDelivered = md;
}
return (ActiveMQMessage) md.getMessage(); return (ActiveMQMessage) md.getMessage();
} }

View File

@ -582,6 +582,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void dispose() throws JMSException { public void dispose() throws JMSException {
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
//log.warn("Consumer is being disposed.", new Exception("trace exception."));
// Do we have any acks we need to send out before closing? // Do we have any acks we need to send out before closing?
// Ack any delivered messages now. (session may still // Ack any delivered messages now. (session may still
// commit/rollback the acks). // commit/rollback the acks).
@ -833,31 +836,36 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void dispatch(MessageDispatch md) { public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener; MessageListener listener = this.messageListener;
try { try {
synchronized(unconsumedMessages.getMutex()){ synchronized(unconsumedMessages.getMutex()){
if (!unconsumedMessages.isClosed()) { if (unconsumedMessages.isClosed()) {
if (listener != null && unconsumedMessages.isRunning() ) { return;
ActiveMQMessage message = createActiveMQMessage(md); }
beforeMessageIsConsumed(md); if (listener == null || !unconsumedMessages.isRunning() ) {
try { unconsumedMessages.enqueue(md);
listener.onMessage(message); if (availableListener != null) {
afterMessageIsConsumed(md, false); availableListener.onMessageAvailable(this);
} catch (RuntimeException e) { }
if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) { return;
// Redeliver the message }
} else { }
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(md, false); ActiveMQMessage message = createActiveMQMessage(md);
} beforeMessageIsConsumed(md);
log.warn("Exception while processing message: " + e, e); try {
} listener.onMessage(message);
} else { afterMessageIsConsumed(md, false);
unconsumedMessages.enqueue(md); } catch (RuntimeException e) {
if (availableListener != null) { if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
availableListener.onMessageAvailable(this); // Redeliver the message
} } else {
} // Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(md, false);
} }
} log.warn("Exception while processing message: " + e, e);
}
} catch (Exception e) { } catch (Exception e) {
session.connection.onAsyncException(e); session.connection.onAsyncException(e);
} }

View File

@ -790,13 +790,10 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
MessageDispatch md=(MessageDispatch) command; MessageDispatch md=(MessageDispatch) command;
Runnable sub=(Runnable) md.getConsumer(); Runnable sub=(Runnable) md.getConsumer();
broker.processDispatch(md); broker.processDispatch(md);
try{ if(sub!=null){
dispatch(command); sub.run();
}finally{
if(sub!=null){
sub.run();
}
} }
dispatch(command);
} else if( command.isShutdownInfo() ) { } else if( command.isShutdownInfo() ) {
dispatch(command); dispatch(command);
dispatchStopped.countDown(); dispatchStopped.countDown();

View File

@ -71,4 +71,11 @@ public interface ConnectionViewMBean extends Service {
*/ */
public String getRemoteAddress(); public String getRemoteAddress();
/**
* Returns the JMS connection id for this connection
*
* @return the JMS connection id for this connection
*/
public String getConnectionId();
} }

View File

@ -280,8 +280,8 @@ public class Queue implements Destination {
public void send(final ConnectionContext context, final Message message) throws Exception { public void send(final ConnectionContext context, final Message message) throws Exception {
if (context.isProducerFlowControl() && !context.isNetworkConnection()) { if (context.isProducerFlowControl() ) {
if( message.isResponseRequired() ) { if( message.isResponseRequired() || context.isNetworkConnection() ) {
if( usageManager.isFull() ) { if( usageManager.isFull() ) {
// System.out.println("Registering callback..."); // System.out.println("Registering callback...");
Runnable callback = new Runnable() { Runnable callback = new Runnable() {

View File

@ -38,7 +38,6 @@ public class TempQueueRegion extends AbstractRegion {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination Policy // We should allow the following to be configurable via a Destination Policy
// setAutoCreateDestinations(false); // setAutoCreateDestinations(false);
System.out.println("test");
} }
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {

View File

@ -232,7 +232,7 @@ public class Topic implements Destination {
public void send(final ConnectionContext context, final Message message) throws Exception { public void send(final ConnectionContext context, final Message message) throws Exception {
if (context.isProducerFlowControl() && !context.isNetworkConnection() ) { if (context.isProducerFlowControl() ) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
} else { } else {

View File

@ -490,8 +490,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message); log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
if( !( message.isResponseRequired() || message.getDestination().isQueue() ) ) {
if( !message.isResponseRequired() ) {
// If the message was originally sent using async send, we will preserve that QOS // If the message was originally sent using async send, we will preserve that QOS
// by bridging it using an async send (small chance of message loss). // by bridging it using an async send (small chance of message loss).

View File

@ -17,6 +17,7 @@ package org.apache.activemq;
* limitations under the License. * limitations under the License.
*/ */
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -59,99 +60,96 @@ public final class LargeStreamletTest extends TestCase {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
BROKER_URL); BROKER_URL);
final ActiveMQConnection connection = (ActiveMQConnection) factory final ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
.createConnection();
connection.start(); connection.start();
final ActiveMQConnection connection2 = (ActiveMQConnection) factory.createConnection();
connection2.start();
try { try {
final Session session = connection.createSession(false, final Destination destination = new ActiveMQQueue("streamtest");
Session.AUTO_ACKNOWLEDGE); final Thread readerThread = new Thread(new Runnable() {
try {
final Destination destination = session.createQueue("wibble");
final Thread readerThread = new Thread(new Runnable() {
public void run() { public void run() {
totalRead.set(0); totalRead.set(0);
try {
final InputStream inputStream = connection
.createInputStream(destination);
try { try {
final InputStream inputStream = connection int read;
.createInputStream(destination); final byte[] buf = new byte[BUFFER_SIZE];
try { while (!stopThreads.get()
int read; && (read = inputStream.read(buf)) != -1) {
final byte[] buf = new byte[BUFFER_SIZE]; totalRead.addAndGet(read);
while (!stopThreads.get()
&& (read = inputStream.read(buf)) != -1) {
totalRead.addAndGet(read);
}
} finally {
inputStream.close();
} }
} catch (Exception e) {
readerException = e;
e.printStackTrace();
} finally { } finally {
log.info(totalRead + " total bytes read."); inputStream.close();
} }
} catch (Exception e) {
readerException = e;
e.printStackTrace();
} finally {
log.info(totalRead + " total bytes read.");
} }
});
final Thread writerThread = new Thread(new Runnable() {
public void run() {
totalWritten.set(0);
int count = MESSAGE_COUNT;
try {
final OutputStream outputStream = connection
.createOutputStream(destination);
try {
final byte[] buf = new byte[BUFFER_SIZE];
new Random().nextBytes(buf);
while (count > 0 && !stopThreads.get()) {
outputStream.write(buf);
totalWritten.addAndGet(buf.length);
count--;
}
} finally {
outputStream.close();
}
} catch (Exception e) {
writerException = e;
e.printStackTrace();
} finally {
log.info(totalWritten
+ " total bytes written.");
}
}
});
readerThread.start();
writerThread.start();
// Wait till reader is has finished receiving all the messages or he has stopped
// receiving messages.
Thread.sleep(1000);
int lastRead = totalRead.get();
while( readerThread.isAlive() ) {
readerThread.join(1000);
// No progress?? then stop waiting..
if( lastRead == totalRead.get() ) {
break;
}
lastRead = totalRead.get();
} }
});
stopThreads.set(true); final Thread writerThread = new Thread(new Runnable() {
assertTrue("Should not have received a reader exception", readerException == null); public void run() {
assertTrue("Should not have received a writer exception", writerException == null); totalWritten.set(0);
int count = MESSAGE_COUNT;
try {
final OutputStream outputStream = connection2
.createOutputStream(destination);
try {
final byte[] buf = new byte[BUFFER_SIZE];
new Random().nextBytes(buf);
while (count > 0 && !stopThreads.get()) {
outputStream.write(buf);
totalWritten.addAndGet(buf.length);
count--;
}
} finally {
outputStream.close();
}
} catch (Exception e) {
writerException = e;
e.printStackTrace();
} finally {
log.info(totalWritten
+ " total bytes written.");
}
}
});
Assert.assertEquals("Not all messages accounted for", readerThread.start();
totalWritten.get(), totalRead.get()); writerThread.start();
} finally {
session.close(); // Wait till reader is has finished receiving all the messages or he has stopped
// receiving messages.
Thread.sleep(1000);
int lastRead = totalRead.get();
while( readerThread.isAlive() ) {
readerThread.join(1000);
// No progress?? then stop waiting..
if( lastRead == totalRead.get() ) {
break;
}
lastRead = totalRead.get();
} }
stopThreads.set(true);
assertTrue("Should not have received a reader exception", readerException == null);
assertTrue("Should not have received a writer exception", writerException == null);
Assert.assertEquals("Not all messages accounted for",
totalWritten.get(), totalRead.get());
} finally { } finally {
connection.close(); connection.close();
connection2.close();
} }
} }

View File

@ -135,7 +135,7 @@ public class ProducerFlowControlTest extends JmsTestSupport {
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService(); BrokerService service = new BrokerService();
service.setPersistent(false); service.setPersistent(false);
service.setUseJmx(false); service.setUseJmx(true);
// Setup a destination policy where it takes only 1 message at a time. // Setup a destination policy where it takes only 1 message at a time.
PolicyMap policyMap = new PolicyMap(); PolicyMap policyMap = new PolicyMap();