This closes #301
This commit is contained in:
commit
d81ba8f81b
|
@ -20,6 +20,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
@ -93,6 +94,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
fileSize = channel.size();
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
|
@ -117,6 +121,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
channel.force(false);
|
||||
channel.position(0);
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
|
@ -125,6 +132,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
fileSize = channel.size();
|
||||
}
|
||||
|
||||
public synchronized void waitForClose() throws InterruptedException {
|
||||
while (isOpen()) {
|
||||
wait();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
|
||||
super.close();
|
||||
|
@ -145,6 +158,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
rfile.close();
|
||||
}
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
|
@ -178,6 +194,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
return bytesRead;
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
if (callback != null) {
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage());
|
||||
|
@ -195,6 +214,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
try {
|
||||
channel.force(false);
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
|
@ -211,6 +233,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
try {
|
||||
return channel.size();
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
|
@ -223,6 +248,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
super.position(pos);
|
||||
channel.position(pos);
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
|
@ -291,6 +319,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
try {
|
||||
doInternalWrite(bytes, sync, callback);
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
}
|
||||
|
@ -306,6 +337,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
try {
|
||||
doInternalWrite(bytes, sync, callback);
|
||||
}
|
||||
catch (ClosedChannelException e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
|
||||
}
|
||||
catch (IOException e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
|
||||
|
|
|
@ -38,6 +38,4 @@ public interface PagingStoreFactory {
|
|||
|
||||
SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
|
||||
|
||||
void criticalException(Throwable e);
|
||||
|
||||
}
|
||||
|
|
|
@ -87,10 +87,6 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
public void criticalException(Throwable e) {
|
||||
critialErrorListener.onIOException(e, e.getMessage(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
}
|
||||
|
|
|
@ -559,16 +559,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
@Override
|
||||
public void setStarted(final boolean started) {
|
||||
synchronized (lock) {
|
||||
// This is to make sure that the delivery process has finished any pending delivery
|
||||
// otherwise a message may sneak in on the client while we are trying to stop the consumer
|
||||
lockDelivery.writeLock().lock();
|
||||
try {
|
||||
this.started = browseOnly || started;
|
||||
}
|
||||
finally {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// Outside the lock
|
||||
if (started) {
|
||||
|
@ -579,16 +571,18 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
@Override
|
||||
public void setTransferring(final boolean transferring) {
|
||||
synchronized (lock) {
|
||||
this.transferring = transferring;
|
||||
}
|
||||
|
||||
// This is to make sure that the delivery process has finished any pending delivery
|
||||
// otherwise a message may sneak in on the client while we are trying to stop the consumer
|
||||
lockDelivery.writeLock().lock();
|
||||
try {
|
||||
this.transferring = transferring;
|
||||
lockDelivery.writeLock().lock();
|
||||
}
|
||||
finally {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Outside the lock
|
||||
if (transferring) {
|
||||
|
|
|
@ -19,12 +19,13 @@ package org.apache.activemq.artemis.tests.integration.broadcast;
|
|||
import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.conf.PlainConfigurator;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class JGroupsBroadcastTest {
|
||||
public class JGroupsBroadcastTest extends ActiveMQTestBase {
|
||||
|
||||
private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enabled=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=500;ack_on_delivery=false;timeout=60000)";
|
||||
|
||||
|
@ -33,10 +34,13 @@ public class JGroupsBroadcastTest {
|
|||
|
||||
@Test
|
||||
public void testRefCount() throws Exception {
|
||||
JChannel channel = null;
|
||||
JChannel newChannel = null;
|
||||
|
||||
try {
|
||||
|
||||
PlainConfigurator configurator = new PlainConfigurator(jgroupsConfigString);
|
||||
JChannel channel = new JChannel(configurator);
|
||||
channel = new JChannel(configurator);
|
||||
|
||||
String channelName1 = "channel1";
|
||||
|
||||
|
@ -71,7 +75,7 @@ public class JGroupsBroadcastTest {
|
|||
catch (Exception e) {
|
||||
}
|
||||
|
||||
JChannel newChannel = new JChannel(configurator);
|
||||
newChannel = new JChannel(configurator);
|
||||
|
||||
jgroupsBroadcastCfg1 = new ChannelBroadcastEndpointFactory(newChannel, channelName1);
|
||||
|
||||
|
@ -85,6 +89,20 @@ public class JGroupsBroadcastTest {
|
|||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
channel.close();
|
||||
}
|
||||
catch (Throwable ignored) {
|
||||
|
||||
}
|
||||
try {
|
||||
newChannel.close();
|
||||
}
|
||||
catch (Throwable ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,10 +17,18 @@
|
|||
package org.apache.activemq.artemis.tests.integration.journal;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
|
||||
|
||||
|
@ -29,4 +37,151 @@ public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
|
|||
return new NIOSequentialFileFactory(new File(folder), true, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterrupts() throws Throwable {
|
||||
|
||||
final EncodingSupport fakeEncoding = new EncodingSupport() {
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
buffer.writeBytes(new byte[10]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
final AtomicInteger calls = new AtomicInteger(0);
|
||||
final NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), new IOCriticalErrorListener() {
|
||||
@Override
|
||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||
new Exception("shutdown").printStackTrace();
|
||||
calls.incrementAndGet();
|
||||
}
|
||||
}, 1);
|
||||
|
||||
Thread threadOpen = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
Thread.currentThread().interrupt();
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadOpen.start();
|
||||
threadOpen.join();
|
||||
|
||||
Thread threadClose = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
file.write(fakeEncoding, true);
|
||||
Thread.currentThread().interrupt();
|
||||
file.close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadClose.start();
|
||||
threadClose.join();
|
||||
|
||||
Thread threadWrite = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
Thread.currentThread().interrupt();
|
||||
file.write(fakeEncoding, true);
|
||||
file.close();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadWrite.start();
|
||||
threadWrite.join();
|
||||
|
||||
Thread threadFill = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
Thread.currentThread().interrupt();
|
||||
file.fill(1024);
|
||||
file.close();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadFill.start();
|
||||
threadFill.join();
|
||||
|
||||
Thread threadWriteDirect = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
ByteBuffer buffer = ByteBuffer.allocate(10);
|
||||
buffer.put(new byte[10]);
|
||||
Thread.currentThread().interrupt();
|
||||
file.writeDirect(buffer, true);
|
||||
file.close();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadWriteDirect.start();
|
||||
threadWriteDirect.join();
|
||||
|
||||
Thread threadRead = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
file.write(fakeEncoding, true);
|
||||
file.position(0);
|
||||
ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize());
|
||||
Thread.currentThread().interrupt();
|
||||
file.read(readBytes);
|
||||
file.close();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadRead.start();
|
||||
threadRead.join();
|
||||
|
||||
// An interrupt exception shouldn't issue a shutdown
|
||||
Assert.assertEquals(0, calls.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -782,10 +782,6 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
static final class FakeStoreFactory implements PagingStoreFactory {
|
||||
|
||||
@Override
|
||||
public void criticalException(Throwable e) {
|
||||
}
|
||||
|
||||
final SequentialFileFactory factory;
|
||||
|
||||
public FakeStoreFactory() {
|
||||
|
|
Loading…
Reference in New Issue