diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index a444d61c33..e9070b37ca 100755 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -40,9 +40,9 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.transport.failover.FailoverTransport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.activemq.util.XASupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +71,7 @@ public class TransactionContext implements XAResource { // XATransactionId -> ArrayList of TransactionContext objects private final static HashMap> ENDED_XA_TRANSACTION_CONTEXTS = - new HashMap>(); + new HashMap>(); private ActiveMQConnection connection; private final LongSequenceGenerator localTransactionIdGenerator; @@ -95,17 +95,17 @@ public class TransactionContext implements XAResource { public boolean isInXATransaction() { if (transactionId != null && transactionId.isXATransaction()) { - return true; + return true; } else { - if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) { - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - for(List transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { - if (transactions.contains(this)) { - return true; - } - } - } - } + if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) { + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + for(List transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { + if (transactions.contains(this)) { + return true; + } + } + } + } } return false; @@ -161,7 +161,7 @@ public class TransactionContext implements XAResource { try { synchronizations.get(i).afterRollback(); } catch (Throwable t) { - LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t); + LOG.debug("Exception from afterRollback on {}", synchronizations.get(i), t); if (firstException == null) { firstException = t; } @@ -184,7 +184,7 @@ public class TransactionContext implements XAResource { try { synchronizations.get(i).afterCommit(); } catch (Throwable t) { - LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t); + LOG.debug("Exception from afterCommit on {}", synchronizations.get(i), t); if (firstException == null) { firstException = t; } @@ -245,11 +245,9 @@ public class TransactionContext implements XAResource { if (localTransactionEventListener != null) { localTransactionEventListener.beginEvent(); } - if (LOG.isDebugEnabled()) { - LOG.debug("Begin:" + transactionId); - } - } + LOG.debug("Begin:{}", transactionId); + } } /** @@ -272,11 +270,8 @@ public class TransactionContext implements XAResource { LOG.warn("rollback processing error", canOcurrOnFailover); } if (transactionId != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Rollback: " + transactionId - + " syncCount: " - + (synchronizations != null ? synchronizations.size() : 0)); - } + LOG.debug("Rollback: {} syncCount: {}", + transactionId, (synchronizations != null ? synchronizations.size() : 0)); TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); this.transactionId = null; @@ -314,11 +309,8 @@ public class TransactionContext implements XAResource { // Only send commit if the transaction was started. if (transactionId != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Commit: " + transactionId - + " syncCount: " - + (synchronizations != null ? synchronizations.size() : 0)); - } + LOG.debug("Commit: {} syncCount: {}", + transactionId, (synchronizations != null ? synchronizations.size() : 0)); TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); this.transactionId = null; @@ -330,7 +322,7 @@ public class TransactionContext implements XAResource { } afterCommit(); } catch (JMSException cause) { - LOG.info("commit failed for transaction " + info.getTransactionId(), cause); + LOG.info("commit failed for transaction {}", info.getTransactionId(), cause); if (localTransactionEventListener != null) { localTransactionEventListener.rollbackEvent(); } @@ -349,11 +341,11 @@ public class TransactionContext implements XAResource { /** * Associates a transaction with the resource. */ + @Override public void start(Xid xid, int flags) throws XAException { - if (LOG.isDebugEnabled()) { - LOG.debug("Start: " + xid + ", flags:" + flags); - } + LOG.debug("Start: {}, flags: {}", xid, XASupport.toString(flags)); + if (isInLocalTransaction()) { throw new XAException(XAException.XAER_PROTO); } @@ -365,7 +357,7 @@ public class TransactionContext implements XAResource { // if ((flags & TMJOIN) == TMJOIN) { // TODO: verify that the server has seen the xid // // } - // if ((flags & TMJOIN) == TMRESUME) { + // if ((flags & TMRESUME) == TMRESUME) { // // TODO: verify that the xid was suspended. // } @@ -382,11 +374,10 @@ public class TransactionContext implements XAResource { return connection.getConnectionInfo().getConnectionId(); } + @Override public void end(Xid xid, int flags) throws XAException { - if (LOG.isDebugEnabled()) { - LOG.debug("End: " + xid + ", flags:" + flags); - } + LOG.debug("End: {}, flags: {}", xid, XASupport.toString(flags)); if (isInLocalTransaction()) { throw new XAException(XAException.XAER_PROTO); @@ -434,10 +425,9 @@ public class TransactionContext implements XAResource { && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); } + @Override public int prepare(Xid xid) throws XAException { - if (LOG.isDebugEnabled()) { - LOG.debug("Prepare: " + xid); - } + LOG.debug("Prepare: {}", xid); // We allow interleaving multiple transactions, so // we don't limit prepare to the associated xid. @@ -458,41 +448,38 @@ public class TransactionContext implements XAResource { IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); if (XAResource.XA_RDONLY == response.getResult()) { // transaction stops now, may be syncs that need a callback - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); - if (l != null && !l.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid); - } - for (TransactionContext ctx : l) { - ctx.afterCommit(); - } - } - } + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid); + for (TransactionContext ctx : l) { + ctx.afterCommit(); + } + } + } } return response.getResult(); } catch (JMSException e) { LOG.warn("prepare of: " + x + " failed with: " + e, e); - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); - if (l != null && !l.isEmpty()) { - for (TransactionContext ctx : l) { - try { - ctx.afterRollback(); - } catch (Throwable ignored) { - if (LOG.isDebugEnabled()) { - LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " + - x + ", context: " + ctx, ignored); - } - } - } - } - } + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + for (TransactionContext ctx : l) { + try { + ctx.afterRollback(); + } catch (Throwable ignored) { + LOG.debug("failed to firing afterRollback callbacks on prepare " + + "failure, txid: {}, context: {}", x, ctx, ignored); + } + } + } + } throw toXAException(e); } } + @Override public void rollback(Xid xid) throws XAException { if (LOG.isDebugEnabled()) { @@ -521,25 +508,24 @@ public class TransactionContext implements XAResource { TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); syncSendPacketWithInterruptionHandling(info); - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); - if (l != null && !l.isEmpty()) { - for (TransactionContext ctx : l) { - ctx.afterRollback(); - } - } - } + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + for (TransactionContext ctx : l) { + ctx.afterRollback(); + } + } + } } catch (JMSException e) { throw toXAException(e); } } // XAResource interface + @Override public void commit(Xid xid, boolean onePhase) throws XAException { - if (LOG.isDebugEnabled()) { - LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); - } + LOG.debug("Commit: {}, onePhase={}", xid, onePhase); // We allow interleaving multiple transactions, so // we don't limit commit to the associated xid. @@ -561,46 +547,42 @@ public class TransactionContext implements XAResource { syncSendPacketWithInterruptionHandling(info); - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); - if (l != null && !l.isEmpty()) { - for (TransactionContext ctx : l) { - try { - ctx.afterCommit(); - } catch (Exception ignored) { - LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored); - } - } - } - } + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + for (TransactionContext ctx : l) { + try { + ctx.afterCommit(); + } catch (Exception ignored) { + LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored); + } + } + } + } } catch (JMSException e) { LOG.warn("commit of: " + x + " failed with: " + e, e); if (onePhase) { - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); - if (l != null && !l.isEmpty()) { - for (TransactionContext ctx : l) { - try { - ctx.afterRollback(); - } catch (Throwable ignored) { - if (LOG.isDebugEnabled()) { - LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored); - } - } - } - } - } + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + for (TransactionContext ctx : l) { + try { + ctx.afterRollback(); + } catch (Throwable ignored) { + LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored); + } + } + } + } } throw toXAException(e); } - } + @Override public void forget(Xid xid) throws XAException { - if (LOG.isDebugEnabled()) { - LOG.debug("Forget: " + xid); - } + LOG.debug("Forget: {}", xid); // We allow interleaving multiple transactions, so // we don't limit forget to the associated xid. @@ -623,11 +605,12 @@ public class TransactionContext implements XAResource { } catch (JMSException e) { throw toXAException(e); } - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - ENDED_XA_TRANSACTION_CONTEXTS.remove(x); - } + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + } } + @Override public boolean isSameRM(XAResource xaResource) throws XAException { if (xaResource == null) { return false; @@ -643,6 +626,7 @@ public class TransactionContext implements XAResource { } } + @Override public Xid[] recover(int flag) throws XAException { LOG.debug("recover({})", flag); @@ -667,10 +651,12 @@ public class TransactionContext implements XAResource { } } + @Override public int getTransactionTimeout() throws XAException { return 0; } + @Override public boolean setTransactionTimeout(int seconds) throws XAException { return false; } @@ -702,9 +688,7 @@ public class TransactionContext implements XAResource { TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); try { this.connection.asyncSendPacket(info); - if (LOG.isDebugEnabled()) { - LOG.debug("{} started XA transaction {} ", this, transactionId); - } + LOG.debug("{} started XA transaction {}", this, transactionId); } catch (JMSException e) { disassociate(); throw toXAException(e); @@ -716,9 +700,7 @@ public class TransactionContext implements XAResource { TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); try { syncSendPacketWithInterruptionHandling(info); - if (LOG.isDebugEnabled()) { - LOG.debug("{} ended XA transaction {}", this, transactionId); - } + LOG.debug("{} ended XA transaction {}", this, transactionId); } catch (JMSException e) { disassociate(); throw toXAException(e); @@ -726,16 +708,16 @@ public class TransactionContext implements XAResource { // Add our self to the list of contexts that are interested in // post commit/rollback events. - synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { - List l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); - if (l == null) { - l = new ArrayList(3); - ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); - l.add(this); - } else if (!l.contains(this)) { - l.add(this); - } - } + synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { + List l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); + if (l == null) { + l = new ArrayList(3); + ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); + l.add(this); + } else if (!l.contains(this)) { + l.add(this); + } + } } disassociate(); @@ -814,7 +796,6 @@ public class TransactionContext implements XAResource { return connection; } - // for RAR xa recovery where xaresource connection is per request public ActiveMQConnection setConnection(ActiveMQConnection connection) { ActiveMQConnection existing = this.connection; diff --git a/activemq-client/src/main/java/org/apache/activemq/util/XASupport.java b/activemq-client/src/main/java/org/apache/activemq/util/XASupport.java new file mode 100644 index 0000000000..86d1f2f6d9 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/util/XASupport.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import static javax.transaction.xa.XAResource.TMENDRSCAN; +import static javax.transaction.xa.XAResource.TMFAIL; +import static javax.transaction.xa.XAResource.TMJOIN; +import static javax.transaction.xa.XAResource.TMNOFLAGS; +import static javax.transaction.xa.XAResource.TMONEPHASE; +import static javax.transaction.xa.XAResource.TMRESUME; +import static javax.transaction.xa.XAResource.TMSTARTRSCAN; +import static javax.transaction.xa.XAResource.TMSUCCESS; +import static javax.transaction.xa.XAResource.TMSUSPEND; + +public class XASupport { + + public static String toString(int flags) { + if (flags == TMNOFLAGS) { + return "TMNOFLAGS"; + } + + StringBuilder result = new StringBuilder(); + if (hasFlag(flags, TMENDRSCAN)) { + add(result, "TMENDRSCAN"); + } + if (hasFlag(flags, TMFAIL)) { + add(result, "TMFAIL"); + } + if (hasFlag(flags, TMJOIN)) { + add(result, "TMJOIN"); + } + if (hasFlag(flags, TMONEPHASE)) { + add(result, "TMONEPHASE"); + } + if (hasFlag(flags, TMRESUME)) { + add(result, "TMRESUME"); + } + if (hasFlag(flags, TMSTARTRSCAN)) { + add(result, "TMSTARTRSCAN"); + } + if (hasFlag(flags, TMSUCCESS)) { + add(result, "TMSUCCESS"); + } + if (hasFlag(flags, TMSUSPEND)) { + add(result, "TMSUSPEND"); + } + + int nonStandardFlags = flags + & ~TMENDRSCAN + & ~TMFAIL + & ~TMJOIN + & ~TMONEPHASE + & ~TMRESUME + & ~TMSTARTRSCAN + & ~TMSUCCESS + & ~TMSUSPEND; + + if (nonStandardFlags != 0) { + add(result, String.format("0x%08x", nonStandardFlags)); + } + + return result.toString(); + } + + private static boolean hasFlag(int flags, int flag) { + return (flags & flag) == flag; + } + + private static void add(StringBuilder result, String string) { + if (result.length() > 0) { + result.append(" | "); + } + result.append(string); + } +} diff --git a/activemq-client/src/test/java/org/apache/activemq/util/XASupportTest.java b/activemq-client/src/test/java/org/apache/activemq/util/XASupportTest.java new file mode 100644 index 0000000000..44539a6cbb --- /dev/null +++ b/activemq-client/src/test/java/org/apache/activemq/util/XASupportTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import static javax.transaction.xa.XAResource.TMENDRSCAN; +import static javax.transaction.xa.XAResource.TMFAIL; +import static javax.transaction.xa.XAResource.TMJOIN; +import static javax.transaction.xa.XAResource.TMNOFLAGS; +import static javax.transaction.xa.XAResource.TMONEPHASE; +import static javax.transaction.xa.XAResource.TMRESUME; +import static javax.transaction.xa.XAResource.TMSTARTRSCAN; +import static javax.transaction.xa.XAResource.TMSUCCESS; +import static javax.transaction.xa.XAResource.TMSUSPEND; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.util.LinkedList; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class XASupportTest { + + private final int flags; + private final String expectedResult; + + public XASupportTest(int flags, String expectedResult) { + this.flags = flags; + this.expectedResult = expectedResult; + } + + @Parameters + public static Iterable parameters() { + List p = new LinkedList(); + + // single values from XAResource + add(p, TMENDRSCAN, "TMENDRSCAN"); + add(p, TMFAIL, "TMFAIL"); + add(p, TMJOIN, "TMJOIN"); + add(p, TMNOFLAGS, "TMNOFLAGS"); + add(p, TMONEPHASE, "TMONEPHASE"); + add(p, TMRESUME, "TMRESUME"); + add(p, TMSTARTRSCAN, "TMSTARTRSCAN"); + add(p, TMSUCCESS, "TMSUCCESS"); + add(p, TMSUSPEND, "TMSUSPEND"); + + // combination of multiple flags + add(p, TMONEPHASE | TMSUCCESS, "TMONEPHASE | TMSUCCESS"); + + // flags not specified in XAResource + add(p, TMSUCCESS | 0x00400000, "TMSUCCESS | 0x00400000"); + + return p; + } + + private static void add(List p, int flags, String expectedResult) { + p.add(new Object[] { flags, expectedResult }); + } + + @Test + public void test() throws Exception { + assertThat(XASupport.toString(flags), is(expectedResult)); + } +} \ No newline at end of file