ARTEMIS-1345 ConcurrentModificationException after copy
This commit is contained in:
parent
682216a3ec
commit
8831a570de
|
@ -80,8 +80,10 @@ public class TypedProperties {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TypedProperties(final TypedProperties other) {
|
public TypedProperties(final TypedProperties other) {
|
||||||
properties = other.properties == null ? null : new HashMap<>(other.properties);
|
synchronized (other) {
|
||||||
size = other.size;
|
properties = other.properties == null ? null : new HashMap<>(other.properties);
|
||||||
|
size = other.size;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasInternalProperties() {
|
public boolean hasInternalProperties() {
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
package org.apache.activemq.artemis.utils;
|
package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
|
@ -255,4 +257,36 @@ public class TypedPropertiesTest {
|
||||||
final TypedProperties.StringValue.ByteBufStringValuePool pool = new TypedProperties.StringValue.ByteBufStringValuePool(1, tooLong.length() - 1);
|
final TypedProperties.StringValue.ByteBufStringValuePool pool = new TypedProperties.StringValue.ByteBufStringValuePool(1, tooLong.length() - 1);
|
||||||
Assert.assertNotSame(pool.getOrCreate(bb), pool.getOrCreate(bb.resetReaderIndex()));
|
Assert.assertNotSame(pool.getOrCreate(bb), pool.getOrCreate(bb.resetReaderIndex()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyingWhileMessingUp() throws Exception {
|
||||||
|
TypedProperties properties = new TypedProperties();
|
||||||
|
AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
AtomicLong copies = new AtomicLong(0);
|
||||||
|
AtomicBoolean error = new AtomicBoolean(false);
|
||||||
|
Thread t = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (running.get() && !error.get()) {
|
||||||
|
try {
|
||||||
|
copies.incrementAndGet();
|
||||||
|
TypedProperties copiedProperties = new TypedProperties(properties);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
error.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t.start();
|
||||||
|
for (int i = 0; !error.get() && (i < 100 || copies.get() < 5000); i++) {
|
||||||
|
properties.putIntProperty(SimpleString.toSimpleString("key" + i), i);
|
||||||
|
}
|
||||||
|
|
||||||
|
running.set(false);
|
||||||
|
|
||||||
|
t.join();
|
||||||
|
|
||||||
|
Assert.assertFalse(error.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2324,7 +2324,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
handled++;
|
handled++;
|
||||||
} else if (status == HandleStatus.BUSY) {
|
} else if (status == HandleStatus.BUSY) {
|
||||||
holder.iter.repeat();
|
try {
|
||||||
|
holder.iter.repeat();
|
||||||
|
} catch (NoSuchElementException e) {
|
||||||
|
// this could happen if there was an exception on the queue handling
|
||||||
|
// and it returned BUSY because of that exception
|
||||||
|
//
|
||||||
|
// We will just log it as there's nothing else we can do now.
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
noDelivery++;
|
noDelivery++;
|
||||||
} else if (status == HandleStatus.NO_MATCH) {
|
} else if (status == HandleStatus.NO_MATCH) {
|
||||||
|
|
Loading…
Reference in New Issue