This commit is contained in:
Clebert Suconic 2018-01-18 10:18:40 -05:00
commit 9f77514225
16 changed files with 354 additions and 61 deletions

View File

@ -736,11 +736,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw new IllegalStateException("Message cannot be routed more than once"); throw new IllegalStateException("Message cannot be routed more than once");
} }
setPagingStore(message); setPagingStore(context.getAddress(message), message);
AtomicBoolean startedTX = new AtomicBoolean(false); AtomicBoolean startedTX = new AtomicBoolean(false);
final SimpleString address = message.getAddressSimpleString(); final SimpleString address = context.getAddress(message);
applyExpiryDelay(message, address); applyExpiryDelay(message, address);
@ -750,7 +750,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.cleanupInternalProperties(); message.cleanupInternalProperties();
Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddressSimpleString() : context.getAddress()); Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message));
// TODO auto-create queues here? // TODO auto-create queues here?
// first check for the auto-queue creation thing // first check for the auto-queue creation thing
@ -854,7 +854,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override @Override
public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception { public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception {
setPagingStore(message); setPagingStore(queue.getAddress(), message);
MessageReference reference = MessageReference.Factory.createReference(message, queue); MessageReference reference = MessageReference.Factory.createReference(message, queue);
@ -1040,8 +1040,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// Private ----------------------------------------------------------------- // Private -----------------------------------------------------------------
private void setPagingStore(final Message message) throws Exception { private void setPagingStore(SimpleString address, Message message) throws Exception {
PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString()); PagingStore store = pagingManager.getPageStore(address);
message.setContext(store); message.setContext(store);
} }
@ -1122,7 +1122,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
MessageReference reference = MessageReference.Factory.createReference(message, queue); MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (context.isAlreadyAcked(message.getAddressSimpleString(), queue)) { if (context.isAlreadyAcked(context.getAddress(message), queue)) {
reference.setAlreadyAcked(); reference.setAlreadyAcked();
if (tx != null) { if (tx != null) {
queue.acknowledge(tx, reference); queue.acknowledge(tx, reference);
@ -1261,7 +1261,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup; byte[] bridgeDupBytes = (byte[]) bridgeDup;
DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress())); DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
if (context.getTransaction() == null) { if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager)); context.setTransaction(new TransactionImpl(storageManager));
@ -1284,7 +1284,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
boolean isDuplicate = false; boolean isDuplicate = false;
if (duplicateIDBytes != null) { if (duplicateIDBytes != null) {
cache = getDuplicateIDCache(message.getAddressSimpleString()); cache = getDuplicateIDCache(context.getAddress(message));
isDuplicate = cache.contains(duplicateIDBytes); isDuplicate = cache.contains(duplicateIDBytes);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
@ -51,7 +52,7 @@ public interface RoutingContext {
void setRoutingType(RoutingType routingType); void setRoutingType(RoutingType routingType);
SimpleString getAddress(); SimpleString getAddress(Message message);
RoutingType getRoutingType(); RoutingType getRoutingType();
} }

View File

@ -93,7 +93,7 @@ public class DivertImpl implements Divert {
Message copy = null; Message copy = null;
// Shouldn't copy if it's not routed anywhere else // Shouldn't copy if it's not routed anywhere else
if (!forwardAddress.equals(context.getAddress())) { if (!forwardAddress.equals(context.getAddress(message))) {
long id = storageManager.generateID(); long id = storageManager.generateID();
copy = message.copy(id); copy = message.copy(id);

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RouteContextList;
@ -92,7 +93,10 @@ public final class RoutingContextImpl implements RoutingContext {
} }
@Override @Override
public SimpleString getAddress() { public SimpleString getAddress(Message message) {
if (address == null && message != null) {
return message.getAddressSimpleString();
}
return address; return address;
} }

View File

@ -34,22 +34,17 @@ public class GroovyRun {
public static final String HORNETQ_235 = "HORNETQ-235"; public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247"; public static final String HORNETQ_247 = "HORNETQ-247";
public static final String WORD_START = "**SERVER STARTED**";
public static Binding binding = new Binding(); public static Binding binding = new Binding();
public static GroovyShell shell = new GroovyShell(binding); public static GroovyShell shell = new GroovyShell(binding);
// Called with reflection /**
public static void doTest(String script, String... arg) throws Throwable { * This can be called from the scripts as well.
int i = 0; * The scripts will use this method instead of its own groovy method.
for (String a : arg) { * As a classloader operation needs to be done here.
System.out.println("[" + (i++) + "]=" + a); */
} public static Object evaluate(String script,
System.out.println(); String[] arg) throws URISyntaxException, IOException {
return evaluate(script, "arg", arg);
evaluate(script, "arg", arg);
System.out.println(WORD_START);
} }
/** /**
@ -57,7 +52,7 @@ public class GroovyRun {
* The scripts will use this method instead of its own groovy method. * The scripts will use this method instead of its own groovy method.
* As a classloader operation needs to be done here. * As a classloader operation needs to be done here.
*/ */
public static void evaluate(String script, public static Object evaluate(String script,
String argVariableName, String argVariableName,
String[] arg) throws URISyntaxException, IOException { String[] arg) throws URISyntaxException, IOException {
URL scriptURL = GroovyRun.class.getClassLoader().getResource(script); URL scriptURL = GroovyRun.class.getClassLoader().getResource(script);
@ -68,16 +63,20 @@ public class GroovyRun {
setVariable(argVariableName, arg); setVariable(argVariableName, arg);
shell.evaluate(scriptURI); return shell.evaluate(scriptURI);
} }
public static void setVariable(String name, Object arg) { public static void setVariable(String name, Object arg) {
binding.setVariable(name, arg); binding.setVariable(name, arg);
} }
public static Object getVariable(String name) {
return binding.getVariable(name);
}
// Called with reflection // Called with reflection
public static void execute(String script) throws Throwable { public static Object execute(String script) throws Throwable {
shell.evaluate(script); return shell.evaluate(script);
} }
public static void assertNotNull(Object value) { public static void assertNotNull(Object value) {

View File

@ -0,0 +1,50 @@
package servers
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.JournalType
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
String folder = arg[0];
String id = "server"
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
System.out.println("folder:: " + folder);
configuration.setBrokerInstance(new File(folder + "/" + id));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
configuration.addAddressesSetting("myQueue", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeBytes(1024 * 1024 * 1024).setPageSizeBytes(1024));
// if the client is using the wrong address, it will wrongly block
configuration.addAddressesSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeBytes(10 * 1024).setPageSizeBytes(1024));
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getJMSServerManager().createQueue(true, "myQueue", null, true);

View File

@ -0,0 +1,47 @@
package meshTest
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
System.out.println("Receiving ");
MessageConsumer consumer = session.createConsumer(queue)
connection.start()
for (int i = 0; i < 500; i++) {
BytesMessage bytesMessage = (BytesMessage) consumer.receive(5000);
GroovyRun.assertNotNull(bytesMessage)
if (i % 100) {
session.commit();
}
}
session.commit();
// Defined on AddressConfigTest.java at the test with setVariable
latch.countDown();

View File

@ -0,0 +1,50 @@
package meshTest
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
println("sending...")
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 500; i++) {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(new byte[512]);
producer.send(bytesMessage);
// we send a big batch as that should be enough to cause blocking on the address
// if the wrong address is being used
if (i % 100 == 0) {
session.commit();
}
}
session.commit();
connection.close();
System.out.println("Message sent");

View File

@ -16,7 +16,7 @@ package clients
* limitations under the License. * limitations under the License.
*/ */
// This script is called by sendMessages.groovy // This script is called by sendMessagesPagingPaging.groovy
import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class AddressConfigTest extends VersionedBaseTest {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
// we don't need every single version ever released..
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
List<Object[]> combinations = new ArrayList<>();
/*
// during development sometimes is useful to comment out the combinations
// and add the ones you are interested.. example:
*/
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FIVE, SNAPSHOT}, new Object[]{ONE_FIVE, SNAPSHOT}));
return combinations;
}
public AddressConfigTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testClientSenderServerAddressSettings() throws Throwable {
evaluate(serverClassloader, "addressConfig/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath());
CountDownLatch latch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", latch);
AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "addressConfig/receiveMessages.groovy", "receive");
} catch (Throwable e) {
errors.incrementAndGet();
}
}
};
t.start();
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "addressConfig/sendMessagesAddress.groovy", "send");
} catch (Throwable e) {
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender is blocking by mistake", latch.await(10, TimeUnit.SECONDS));
} finally {
t.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t.isAlive()) {
t.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}

View File

@ -108,13 +108,13 @@ public class ExportImportTest extends VersionedBaseTest {
setVariable(senderClassloader, "legacy", false); setVariable(senderClassloader, "legacy", false);
setVariable(senderClassloader, "persistent", true); setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "sender"); startServer(serverFolder.getRoot(), senderClassloader, "sender");
callScript(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader); stopServer(senderClassloader);
if (sender.startsWith("ARTEMIS-1")) { if (sender.startsWith("ARTEMIS-1")) {
callScript(senderClassloader, "exportimport/export1X.groovy", serverFolder.getRoot().getAbsolutePath()); evaluate(senderClassloader, "exportimport/export1X.groovy", serverFolder.getRoot().getAbsolutePath());
} else { } else {
callScript(senderClassloader, "exportimport/export.groovy", serverFolder.getRoot().getAbsolutePath()); evaluate(senderClassloader, "exportimport/export.groovy", serverFolder.getRoot().getAbsolutePath());
} }
setVariable(receiverClassloader, "legacy", legacyPrefixes); setVariable(receiverClassloader, "legacy", legacyPrefixes);
@ -124,10 +124,10 @@ public class ExportImportTest extends VersionedBaseTest {
setVariable(receiverClassloader, "sort", sender.startsWith("ARTEMIS-1")); setVariable(receiverClassloader, "sort", sender.startsWith("ARTEMIS-1"));
callScript(receiverClassloader, "exportimport/import.groovy", serverFolder.getRoot().getAbsolutePath()); evaluate(receiverClassloader, "exportimport/import.groovy", serverFolder.getRoot().getAbsolutePath());
setVariable(receiverClassloader, "latch", null); setVariable(receiverClassloader, "latch", null);
callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
} finally { } finally {
setVariable(receiverClassloader, "legacy", false); setVariable(receiverClassloader, "legacy", false);
} }

View File

@ -95,14 +95,14 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
public void testSendReceive() throws Throwable { public void testSendReceive() throws Throwable {
setVariable(senderClassloader, "persistent", true); setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest"); startServer(serverFolder.getRoot(), senderClassloader, "journalTest");
callScript(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader); stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true); setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest"); startServer(serverFolder.getRoot(), receiverClassloader, "journalTest");
setVariable(receiverClassloader, "latch", null); setVariable(receiverClassloader, "latch", null);
callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
} }
} }

View File

@ -82,8 +82,8 @@ public class MeshTest extends ServerBaseTest {
@Test @Test
public void testSendReceive() throws Throwable { public void testSendReceive() throws Throwable {
setVariable(receiverClassloader, "latch", null); setVariable(receiverClassloader, "latch", null);
callScript(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
} }
@Test @Test
@ -96,7 +96,7 @@ public class MeshTest extends ServerBaseTest {
@Override @Override
public void run() { public void run() {
try { try {
callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveNonDurableSubscription"); evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveNonDurableSubscription");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
errors.incrementAndGet(); errors.incrementAndGet();
@ -106,7 +106,7 @@ public class MeshTest extends ServerBaseTest {
t.start(); t.start();
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
callScript(senderClassloader,"meshTest/sendMessages.groovy", server, sender, "sendTopic"); evaluate(senderClassloader,"meshTest/sendMessages.groovy", server, sender, "sendTopic");
t.join(); t.join();

View File

@ -76,8 +76,8 @@ public class SendAckTest extends ServerBaseTest {
@Test @Test
public void testSendReceive() throws Throwable { public void testSendReceive() throws Throwable {
callScript(senderClassloader, "sendAckTest/sendAckMessages.groovy", server, sender, "sendAckMessages"); evaluate(senderClassloader, "sendAckTest/sendAckMessages.groovy", server, sender, "sendAckMessages");
callScript(receiverClassloader, "sendAckTest/sendAckMessages.groovy", server, receiver, "receiveMessages"); evaluate(receiverClassloader, "sendAckTest/sendAckMessages.groovy", server, receiver, "receiveMessages");
} }
} }

View File

@ -91,15 +91,15 @@ public class SerializationTest extends VersionedBaseTest {
@Test @Test
public void testSerializeFactory() throws Throwable { public void testSerializeFactory() throws Throwable {
File file = serverFolder.newFile("objects.ser"); File file = serverFolder.newFile("objects.ser");
callScript(senderClassloader, "serial/serial.groovy", file.getAbsolutePath(), "write", sender); evaluate(senderClassloader, "serial/serial.groovy", file.getAbsolutePath(), "write", sender);
callScript(receiverClassloader, "serial/serial.groovy", file.getAbsolutePath(), "read", receiver); evaluate(receiverClassloader, "serial/serial.groovy", file.getAbsolutePath(), "read", receiver);
} }
@Test @Test
public void testJBMSerializeFactory() throws Throwable { public void testJBMSerializeFactory() throws Throwable {
File file = serverFolder.newFile("objectsjbm.ser"); File file = serverFolder.newFile("objectsjbm.ser");
callScript(senderClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "write", sender); evaluate(senderClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "write", sender);
callScript(receiverClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "read", receiver); evaluate(receiverClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "read", receiver);
} }
} }

View File

@ -28,7 +28,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.utils.RunnableEx;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assume; import org.junit.Assume;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -81,11 +80,11 @@ public abstract class VersionedBaseTest {
loaderMap.clear(); loaderMap.clear();
} }
protected static void callScript(ClassLoader loader, String script, String... arguments) throws Exception { protected static Object evaluate(ClassLoader loader, String script, String... arguments) throws Exception {
tclCall(loader, () -> { return tclCall(loader, () -> {
Class clazz = loader.loadClass(GroovyRun.class.getName()); Class clazz = loader.loadClass(GroovyRun.class.getName());
Method method = clazz.getMethod("doTest", String.class, String[].class); Method method = clazz.getMethod("evaluate", String.class, String[].class);
method.invoke(null, script, arguments); return method.invoke(null, script, arguments);
}); });
} }
@ -94,28 +93,41 @@ public abstract class VersionedBaseTest {
Class clazz = loader.loadClass(GroovyRun.class.getName()); Class clazz = loader.loadClass(GroovyRun.class.getName());
Method method = clazz.getMethod("setVariable", String.class, Object.class); Method method = clazz.getMethod("setVariable", String.class, Object.class);
method.invoke(null, name, object); method.invoke(null, name, object);
return null;
}); });
} }
protected static void callExecute(ClassLoader loader, String script) throws Exception { protected static Object setVariable(ClassLoader loader, String name) throws Exception {
tclCall(loader, () -> { return tclCall(loader, () -> {
Class clazz = loader.loadClass(GroovyRun.class.getName());
Method method = clazz.getMethod("getVariable", String.class);
return method.invoke(null, name);
});
}
protected static Object execute(ClassLoader loader, String script) throws Exception {
return tclCall(loader, () -> {
Class clazz = loader.loadClass(GroovyRun.class.getName()); Class clazz = loader.loadClass(GroovyRun.class.getName());
Method method = clazz.getMethod("execute", String.class); Method method = clazz.getMethod("execute", String.class);
method.invoke(null, script); return method.invoke(null, script);
}); });
} }
protected static void tclCall(ClassLoader loader, RunnableEx run) throws Exception { protected static Object tclCall(ClassLoader loader, CallIt run) throws Exception {
ClassLoader original = Thread.currentThread().getContextClassLoader(); ClassLoader original = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(loader); Thread.currentThread().setContextClassLoader(loader);
try { try {
run.run(); return run.run();
} finally { } finally {
Thread.currentThread().setContextClassLoader(original); Thread.currentThread().setContextClassLoader(original);
} }
} }
public interface CallIt {
Object run() throws Exception;
}
protected static ClassLoader defineClassLoader(String classPath) throws MalformedURLException { protected static ClassLoader defineClassLoader(String classPath) throws MalformedURLException {
String[] classPathArray = classPath.split(File.pathSeparator); String[] classPathArray = classPath.split(File.pathSeparator);
URL[] elements = new URL[classPathArray.length]; URL[] elements = new URL[classPathArray.length];
@ -188,10 +200,10 @@ public abstract class VersionedBaseTest {
scriptToUse = "servers/hornetqServer.groovy"; scriptToUse = "servers/hornetqServer.groovy";
} }
callScript(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver); evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver);
} }
public void stopServer(ClassLoader loader) throws Throwable { public void stopServer(ClassLoader loader) throws Throwable {
callExecute(loader, "server.stop()"); execute(loader, "server.stop()");
} }
} }