mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5665 - memory store don't propagate connection context
This commit is contained in:
parent
ab8f54b066
commit
69767a2f2f
|
@ -140,23 +140,23 @@ public class MemoryTransactionStore implements TransactionStore {
|
||||||
ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) {
|
ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) {
|
||||||
@Override
|
@Override
|
||||||
public void addMessage(ConnectionContext context, final Message send) throws IOException {
|
public void addMessage(ConnectionContext context, final Message send) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), send);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
|
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), send);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
|
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), message);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
|
||||||
return new InlineListenableFuture();
|
return new InlineListenableFuture();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
|
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), message);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
|
||||||
return new InlineListenableFuture();
|
return new InlineListenableFuture();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,23 +181,23 @@ public class MemoryTransactionStore implements TransactionStore {
|
||||||
ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
|
ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
|
||||||
@Override
|
@Override
|
||||||
public void addMessage(ConnectionContext context, final Message send) throws IOException {
|
public void addMessage(ConnectionContext context, final Message send) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), send);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
|
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), send);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
|
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), message);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
|
||||||
return new InlineListenableFuture();
|
return new InlineListenableFuture();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
|
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
|
||||||
MemoryTransactionStore.this.addMessage(getDelegate(), message);
|
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
|
||||||
return new InlineListenableFuture();
|
return new InlineListenableFuture();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,7 +310,7 @@ public class MemoryTransactionStore implements TransactionStore {
|
||||||
* @param message
|
* @param message
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void addMessage(final MessageStore destination, final Message message) throws IOException {
|
void addMessage(final ConnectionContext context, final MessageStore destination, final Message message) throws IOException {
|
||||||
|
|
||||||
if (doingRecover) {
|
if (doingRecover) {
|
||||||
return;
|
return;
|
||||||
|
@ -340,7 +340,7 @@ public class MemoryTransactionStore implements TransactionStore {
|
||||||
|
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
destination.addMessage(null, message);
|
destination.addMessage(context, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue