This commit is contained in:
Clebert Suconic 2017-05-12 10:06:43 -04:00
commit 5e6687e0e2
90 changed files with 2693 additions and 1088 deletions

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
public class ProcessBuilder {

View File

@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
/**

View File

@ -32,7 +32,7 @@ public final class DataConstants {
public static final int SIZE_FLOAT = 4;
static final int SIZE_CHAR = 2;
public static final int SIZE_CHAR = 2;
public static final byte TRUE = 1;

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -14,924 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN;
import static org.apache.activemq.artemis.utils.DataConstants.BYTE;
import static org.apache.activemq.artemis.utils.DataConstants.BYTES;
import static org.apache.activemq.artemis.utils.DataConstants.CHAR;
import static org.apache.activemq.artemis.utils.DataConstants.DOUBLE;
import static org.apache.activemq.artemis.utils.DataConstants.FLOAT;
import static org.apache.activemq.artemis.utils.DataConstants.INT;
import static org.apache.activemq.artemis.utils.DataConstants.LONG;
import static org.apache.activemq.artemis.utils.DataConstants.NULL;
import static org.apache.activemq.artemis.utils.DataConstants.SHORT;
import static org.apache.activemq.artemis.utils.DataConstants.STRING;
/**
* Property Value Conversion.
* <p>
* This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification
* (Version 1.1 April 12, 2002).
* <p>
* @deprecated Use {@link org.apache.activemq.artemis.utils.collections.TypedProperties instead}
*/
public final class TypedProperties {
@Deprecated
public class TypedProperties extends org.apache.activemq.artemis.utils.collections.TypedProperties {
private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_");
private Map<SimpleString, PropertyValue> properties;
private volatile int size;
private boolean internalProperties;
public TypedProperties() {
}
/**
* Return the number of properites
* */
public int size() {
return properties.size();
}
public int getMemoryOffset() {
// The estimate is basically the encode size + 2 object references for each entry in the map
// Note we don't include the attributes or anything else since they already included in the memory estimate
// of the ServerMessage
return properties == null ? 0 : size + 2 * DataConstants.SIZE_INT * properties.size();
}
public TypedProperties(final TypedProperties other) {
properties = other.properties == null ? null : new HashMap<>(other.properties);
size = other.size;
}
public boolean hasInternalProperties() {
return internalProperties;
}
public void putBooleanProperty(final SimpleString key, final boolean value) {
checkCreateProperties();
doPutValue(key, new BooleanValue(value));
}
public void putByteProperty(final SimpleString key, final byte value) {
checkCreateProperties();
doPutValue(key, new ByteValue(value));
}
public void putBytesProperty(final SimpleString key, final byte[] value) {
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new BytesValue(value));
}
public void putShortProperty(final SimpleString key, final short value) {
checkCreateProperties();
doPutValue(key, new ShortValue(value));
}
public void putIntProperty(final SimpleString key, final int value) {
checkCreateProperties();
doPutValue(key, new IntValue(value));
}
public void putLongProperty(final SimpleString key, final long value) {
checkCreateProperties();
doPutValue(key, new LongValue(value));
}
public void putFloatProperty(final SimpleString key, final float value) {
checkCreateProperties();
doPutValue(key, new FloatValue(value));
}
public void putDoubleProperty(final SimpleString key, final double value) {
checkCreateProperties();
doPutValue(key, new DoubleValue(value));
}
public void putSimpleStringProperty(final SimpleString key, final SimpleString value) {
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new StringValue(value));
}
public void putNullValue(final SimpleString key) {
checkCreateProperties();
doPutValue(key, new NullValue());
}
public void putCharProperty(final SimpleString key, final char value) {
checkCreateProperties();
doPutValue(key, new CharValue(value));
}
public void putTypedProperties(final TypedProperties otherProps) {
if (otherProps == null || otherProps.properties == null) {
return;
}
checkCreateProperties();
Set<Entry<SimpleString, PropertyValue>> otherEntries = otherProps.properties.entrySet();
for (Entry<SimpleString, PropertyValue> otherEntry : otherEntries) {
doPutValue(otherEntry.getKey(), otherEntry.getValue());
}
}
public Object getProperty(final SimpleString key) {
return doGetProperty(key);
}
public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Boolean.valueOf(null);
} else if (value instanceof Boolean) {
return (Boolean) value;
} else if (value instanceof SimpleString) {
return Boolean.valueOf(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Byte.valueOf(null);
} else if (value instanceof Byte) {
return (Byte) value;
} else if (value instanceof SimpleString) {
return Byte.parseByte(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Character getCharProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
throw new NullPointerException("Invalid conversion: " + key);
}
if (value instanceof Character) {
return ((Character) value);
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return null;
} else if (value instanceof byte[]) {
return (byte[]) value;
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Double.valueOf(null);
} else if (value instanceof Float) {
return ((Float) value).doubleValue();
} else if (value instanceof Double) {
return (Double) value;
} else if (value instanceof SimpleString) {
return Double.parseDouble(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Integer.valueOf(null);
} else if (value instanceof Integer) {
return (Integer) value;
} else if (value instanceof Byte) {
return ((Byte) value).intValue();
} else if (value instanceof Short) {
return ((Short) value).intValue();
} else if (value instanceof SimpleString) {
return Integer.parseInt(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Long.valueOf(null);
} else if (value instanceof Long) {
return (Long) value;
} else if (value instanceof Byte) {
return ((Byte) value).longValue();
} else if (value instanceof Short) {
return ((Short) value).longValue();
} else if (value instanceof Integer) {
return ((Integer) value).longValue();
} else if (value instanceof SimpleString) {
return Long.parseLong(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Short.valueOf(null);
} else if (value instanceof Byte) {
return ((Byte) value).shortValue();
} else if (value instanceof Short) {
return (Short) value;
} else if (value instanceof SimpleString) {
return Short.parseShort(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null)
return Float.valueOf(null);
if (value instanceof Float) {
return ((Float) value);
}
if (value instanceof SimpleString) {
return Float.parseFloat(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return null;
}
if (value instanceof SimpleString) {
return (SimpleString) value;
} else if (value instanceof Boolean) {
return new SimpleString(value.toString());
} else if (value instanceof Character) {
return new SimpleString(value.toString());
} else if (value instanceof Byte) {
return new SimpleString(value.toString());
} else if (value instanceof Short) {
return new SimpleString(value.toString());
} else if (value instanceof Integer) {
return new SimpleString(value.toString());
} else if (value instanceof Long) {
return new SimpleString(value.toString());
} else if (value instanceof Float) {
return new SimpleString(value.toString());
} else if (value instanceof Double) {
return new SimpleString(value.toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Object removeProperty(final SimpleString key) {
return doRemoveProperty(key);
}
public boolean containsProperty(final SimpleString key) {
if (size == 0) {
return false;
} else {
return properties.containsKey(key);
}
}
public Set<SimpleString> getPropertyNames() {
if (size == 0) {
return Collections.emptySet();
} else {
return properties.keySet();
}
}
public synchronized void decode(final ByteBuf buffer) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
properties = null;
} else {
int numHeaders = buffer.readInt();
properties = new HashMap<>(numHeaders);
size = 0;
for (int i = 0; i < numHeaders; i++) {
int len = buffer.readInt();
byte[] data = new byte[len];
buffer.readBytes(data);
SimpleString key = new SimpleString(data);
byte type = buffer.readByte();
PropertyValue val;
switch (type) {
case NULL: {
val = new NullValue();
doPutValue(key, val);
break;
}
case CHAR: {
val = new CharValue(buffer);
doPutValue(key, val);
break;
}
case BOOLEAN: {
val = new BooleanValue(buffer);
doPutValue(key, val);
break;
}
case BYTE: {
val = new ByteValue(buffer);
doPutValue(key, val);
break;
}
case BYTES: {
val = new BytesValue(buffer);
doPutValue(key, val);
break;
}
case SHORT: {
val = new ShortValue(buffer);
doPutValue(key, val);
break;
}
case INT: {
val = new IntValue(buffer);
doPutValue(key, val);
break;
}
case LONG: {
val = new LongValue(buffer);
doPutValue(key, val);
break;
}
case FLOAT: {
val = new FloatValue(buffer);
doPutValue(key, val);
break;
}
case DOUBLE: {
val = new DoubleValue(buffer);
doPutValue(key, val);
break;
}
case STRING: {
val = new StringValue(buffer);
doPutValue(key, val);
break;
}
default: {
throw ActiveMQUtilBundle.BUNDLE.invalidType(type);
}
}
}
}
}
public synchronized void encode(final ByteBuf buffer) {
if (properties == null) {
buffer.writeByte(DataConstants.NULL);
} else {
buffer.writeByte(DataConstants.NOT_NULL);
buffer.writeInt(properties.size());
for (Map.Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
SimpleString s = entry.getKey();
byte[] data = s.getData();
buffer.writeInt(data.length);
buffer.writeBytes(data);
entry.getValue().write(buffer);
}
}
}
public int getEncodeSize() {
if (properties == null) {
return DataConstants.SIZE_BYTE;
} else {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
}
}
public void clear() {
if (properties != null) {
properties.clear();
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("TypedProperties[");
if (properties != null) {
Iterator<Entry<SimpleString, PropertyValue>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Entry<SimpleString, PropertyValue> iterItem = iter.next();
sb.append(iterItem.getKey() + "=");
// it seems weird but it's right!!
// The first getValue is from the EntrySet
// The second is to convert the PropertyValue into the actual value
Object theValue = iterItem.getValue().getValue();
if (theValue == null) {
sb.append("NULL-value");
} else if (theValue instanceof byte[]) {
sb.append("[" + ByteUtil.maxString(ByteUtil.bytesToHex((byte[]) theValue, 2), 150) + ")");
if (iterItem.getKey().toString().startsWith("_AMQ_ROUTE_TO")) {
sb.append(",bytesAsLongs(");
try {
ByteBuffer buff = ByteBuffer.wrap((byte[]) theValue);
while (buff.hasRemaining()) {
long bindingID = buff.getLong();
sb.append(bindingID);
if (buff.hasRemaining()) {
sb.append(",");
}
}
} catch (Throwable e) {
sb.append("error-converting-longs=" + e.getMessage());
}
sb.append("]");
}
} else {
sb.append(theValue.toString());
}
if (iter.hasNext()) {
sb.append(",");
}
}
}
return sb.append("]").toString();
}
// Private ------------------------------------------------------------------------------------
private void checkCreateProperties() {
if (properties == null) {
properties = new HashMap<>();
}
}
private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
if (key.startsWith(AMQ_PROPNAME)) {
internalProperties = true;
}
PropertyValue oldValue = properties.put(key, value);
if (oldValue != null) {
size += value.encodeSize() - oldValue.encodeSize();
} else {
size += SimpleString.sizeofString(key) + value.encodeSize();
}
}
private synchronized Object doRemoveProperty(final SimpleString key) {
if (properties == null) {
return null;
}
PropertyValue val = properties.remove(key);
if (val == null) {
return null;
} else {
size -= SimpleString.sizeofString(key) + val.encodeSize();
return val.getValue();
}
}
private synchronized Object doGetProperty(final Object key) {
if (size == 0) {
return null;
}
PropertyValue val = properties.get(key);
if (val == null) {
return null;
} else {
return val.getValue();
}
}
// Inner classes ------------------------------------------------------------------------------
private abstract static class PropertyValue {
abstract Object getValue();
abstract void write(ByteBuf buffer);
abstract int encodeSize();
@Override
public String toString() {
return "" + getValue();
}
}
private static final class NullValue extends PropertyValue {
private NullValue() {
}
@Override
public Object getValue() {
return null;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.NULL);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE;
}
}
private static final class BooleanValue extends PropertyValue {
final boolean val;
private BooleanValue(final boolean val) {
this.val = val;
}
private BooleanValue(final ByteBuf buffer) {
val = buffer.readBoolean();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BOOLEAN);
buffer.writeBoolean(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN;
}
}
private static final class ByteValue extends PropertyValue {
final byte val;
private ByteValue(final byte val) {
this.val = val;
}
private ByteValue(final ByteBuf buffer) {
val = buffer.readByte();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BYTE);
buffer.writeByte(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_BYTE;
}
}
private static final class BytesValue extends PropertyValue {
final byte[] val;
private BytesValue(final byte[] val) {
this.val = val;
}
private BytesValue(final ByteBuf buffer) {
int len = buffer.readInt();
val = new byte[len];
buffer.readBytes(val);
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BYTES);
buffer.writeInt(val.length);
buffer.writeBytes(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + val.length;
}
}
private static final class ShortValue extends PropertyValue {
final short val;
private ShortValue(final short val) {
this.val = val;
}
private ShortValue(final ByteBuf buffer) {
val = buffer.readShort();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.SHORT);
buffer.writeShort(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_SHORT;
}
}
private static final class IntValue extends PropertyValue {
final int val;
private IntValue(final int val) {
this.val = val;
}
private IntValue(final ByteBuf buffer) {
val = buffer.readInt();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.INT);
buffer.writeInt(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT;
}
}
private static final class LongValue extends PropertyValue {
final long val;
private LongValue(final long val) {
this.val = val;
}
private LongValue(final ByteBuf buffer) {
val = buffer.readLong();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.LONG);
buffer.writeLong(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
}
}
private static final class FloatValue extends PropertyValue {
final float val;
private FloatValue(final float val) {
this.val = val;
}
private FloatValue(final ByteBuf buffer) {
val = Float.intBitsToFloat(buffer.readInt());
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.FLOAT);
buffer.writeInt(Float.floatToIntBits(val));
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_FLOAT;
}
}
private static final class DoubleValue extends PropertyValue {
final double val;
private DoubleValue(final double val) {
this.val = val;
}
private DoubleValue(final ByteBuf buffer) {
val = Double.longBitsToDouble(buffer.readLong());
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.DOUBLE);
buffer.writeLong(Double.doubleToLongBits(val));
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_DOUBLE;
}
}
private static final class CharValue extends PropertyValue {
final char val;
private CharValue(final char val) {
this.val = val;
}
private CharValue(final ByteBuf buffer) {
val = (char) buffer.readShort();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.CHAR);
buffer.writeShort((short) val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_CHAR;
}
}
private static final class StringValue extends PropertyValue {
final SimpleString val;
private StringValue(final SimpleString val) {
this.val = val;
}
private StringValue(final ByteBuf buffer) {
val = SimpleString.readSimpleString(buffer);
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.STRING);
SimpleString.writeSimpleString(buffer, val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
}
}
public boolean isEmpty() {
return properties.isEmpty();
}
public Map<String, Object> getMap() {
Map<String, Object> m = new HashMap<>();
for (Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
Object val = entry.getValue().getValue();
if (val instanceof SimpleString) {
m.put(entry.getKey().toString(), ((SimpleString) val).toString());
} else {
m.put(entry.getKey().toString(), val);
}
}
return m;
}
/**
* Helper for MapMessage#setObjectProperty(String, Object)
*
* @param key The SimpleString key
* @param value The Object value
* @param properties The typed properties
*/
public static void setObjectProperty(final SimpleString key, final Object value, final TypedProperties properties) {
if (value == null) {
properties.putNullValue(key);
} else if (value instanceof Boolean) {
properties.putBooleanProperty(key, (Boolean) value);
} else if (value instanceof Byte) {
properties.putByteProperty(key, (Byte) value);
} else if (value instanceof Character) {
properties.putCharProperty(key, (Character) value);
} else if (value instanceof Short) {
properties.putShortProperty(key, (Short) value);
} else if (value instanceof Integer) {
properties.putIntProperty(key, (Integer) value);
} else if (value instanceof Long) {
properties.putLongProperty(key, (Long) value);
} else if (value instanceof Float) {
properties.putFloatProperty(key, (Float) value);
} else if (value instanceof Double) {
properties.putDoubleProperty(key, (Double) value);
} else if (value instanceof String) {
properties.putSimpleStringProperty(key, new SimpleString((String) value));
} else if (value instanceof SimpleString) {
properties.putSimpleStringProperty(key, (SimpleString) value);
} else if (value instanceof byte[]) {
properties.putBytesProperty(key, (byte[]) value);
} else {
throw new ActiveMQPropertyConversionException(value.getClass() + " is not a valid property type");
}
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
package org.apache.activemq.artemis.utils.collections;
import java.util.AbstractSet;
import java.util.Iterator;

View File

@ -0,0 +1,504 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import java.util.function.LongFunction;
import com.google.common.collect.Lists;
/**
* Map from long to an Object.
*
* Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
* <ol>
* <li>No boxing/unboxing from long -> Long
* <li>Open hash map with linear probing, no node allocations to store the values
* </ol>
*
* @param <V>
*/
@SuppressWarnings("unchecked")
public class ConcurrentLongHashMap<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();
private static final float MapFillFactor = 0.66f;
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
private final Section<V>[] sections;
public ConcurrentLongHashMap() {
this(DefaultExpectedItems);
}
public ConcurrentLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
public ConcurrentLongHashMap(int expectedItems, int numSections) {
checkArgument(numSections > 0);
if (expectedItems < numSections) {
expectedItems = numSections;
}
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];
for (int i = 0; i < numSections; i++) {
sections[i] = new Section<>(perSectionCapacity);
}
}
public int size() {
int size = 0;
for (Section<V> s : sections) {
size += s.size;
}
return size;
}
long getUsedBucketCount() {
long usedBucketCount = 0;
for (Section<V> s : sections) {
usedBucketCount += s.usedBuckets;
}
return usedBucketCount;
}
public long capacity() {
long capacity = 0;
for (Section<V> s : sections) {
capacity += s.capacity;
}
return capacity;
}
public boolean isEmpty() {
for (Section<V> s : sections) {
if (s.size != 0) {
return false;
}
}
return true;
}
public V get(long key) {
long h = hash(key);
return getSection(h).get(key, (int) h);
}
public boolean containsKey(long key) {
return get(key) != null;
}
public V put(long key, V value) {
checkNotNull(value);
long h = hash(key);
return getSection(h).put(key, value, (int) h, false, null);
}
public V putIfAbsent(long key, V value) {
checkNotNull(value);
long h = hash(key);
return getSection(h).put(key, value, (int) h, true, null);
}
public V computeIfAbsent(long key, LongFunction<V> provider) {
checkNotNull(provider);
long h = hash(key);
return getSection(h).put(key, null, (int) h, true, provider);
}
public V remove(long key) {
long h = hash(key);
return getSection(h).remove(key, null, (int) h);
}
public boolean remove(long key, Object value) {
checkNotNull(value);
long h = hash(key);
return getSection(h).remove(key, value, (int) h) != null;
}
private Section<V> getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
return sections[sectionIdx];
}
public void clear() {
for (Section<V> s : sections) {
s.clear();
}
}
public void forEach(EntryProcessor<V> processor) {
for (Section<V> s : sections) {
s.forEach(processor);
}
}
/**
* @return a new list of all keys (makes a copy)
*/
public List<Long> keys() {
List<Long> keys = Lists.newArrayListWithExpectedSize((int) size());
forEach((key, value) -> keys.add(key));
return keys;
}
public ConcurrentLongHashSet keysLongHashSet() {
ConcurrentLongHashSet concurrentLongHashSet = new ConcurrentLongHashSet(size());
forEach((key, value) -> concurrentLongHashSet.add(key));
return concurrentLongHashSet;
}
public List<V> values() {
List<V> values = Lists.newArrayListWithExpectedSize((int) size());
forEach((key, value) -> values.add(value));
return values;
}
public interface EntryProcessor<V> {
void accept(long key, V value);
}
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section<V> extends StampedLock {
private long[] keys;
private V[] values;
private int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Section(int capacity) {
this.capacity = alignToPowerOfTwo(capacity);
this.keys = new long[this.capacity];
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * MapFillFactor);
}
@SuppressWarnings("NonAtomicVolatileUpdate")
V get(long key, int keyHash) {
int bucket = keyHash;
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
try {
while (true) {
int capacity = this.capacity;
bucket = signSafeMod(bucket, capacity);
// First try optimistic locking
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (!acquiredLock && validate(stamp)) {
// The values we have read are consistent
if (storedKey == key) {
return storedValue != DeletedValue ? storedValue : null;
} else if (storedValue == EmptyValue) {
// Not found
return null;
}
} else {
// Fallback to acquiring read lock
if (!acquiredLock) {
stamp = readLock();
acquiredLock = true;
storedKey = keys[bucket];
storedValue = values[bucket];
}
if (capacity != this.capacity) {
// There has been a rehashing. We need to restart the search
bucket = keyHash;
continue;
}
if (storedKey == key) {
return storedValue != DeletedValue ? storedValue : null;
} else if (storedValue == EmptyValue) {
// Not found
return null;
}
}
++bucket;
}
} finally {
if (acquiredLock) {
unlockRead(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valueProvider) {
int bucket = keyHash;
long stamp = writeLock();
int capacity = this.capacity;
// Remember where we find the first available spot
int firstDeletedKey = -1;
try {
while (true) {
bucket = signSafeMod(bucket, capacity);
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (storedKey == key) {
if (storedValue == EmptyValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
++size;
++usedBuckets;
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
++size;
return valueProvider != null ? values[bucket] : null;
} else if (!onlyIfAbsent) {
// Over written an old value for same key
values[bucket] = value;
return storedValue;
} else {
return storedValue;
}
} else if (storedValue == EmptyValue) {
// Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
// key, we should write at that position
if (firstDeletedKey != -1) {
bucket = firstDeletedKey;
} else {
++usedBuckets;
}
keys[bucket] = key;
values[bucket] = value != null ? value : valueProvider.apply(key);
++size;
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {
// The bucket contained a different deleted key
if (firstDeletedKey == -1) {
firstDeletedKey = bucket;
}
}
++bucket;
}
} finally {
if (usedBuckets > resizeThreshold) {
try {
rehash();
} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
private V remove(long key, Object value, int keyHash) {
int bucket = keyHash;
long stamp = writeLock();
try {
while (true) {
int capacity = this.capacity;
bucket = signSafeMod(bucket, capacity);
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (storedKey == key) {
if (value == null || value.equals(storedValue)) {
if (storedValue == EmptyValue || storedValue == DeletedValue) {
return null;
}
--size;
V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
if (nextValueInArray == EmptyValue) {
values[bucket] = (V) EmptyValue;
--usedBuckets;
} else {
values[bucket] = (V) DeletedValue;
}
return storedValue;
} else {
return null;
}
} else if (storedValue == EmptyValue) {
// Key wasn't found
return null;
}
++bucket;
}
} finally {
unlockWrite(stamp);
}
}
void clear() {
long stamp = writeLock();
try {
Arrays.fill(keys, 0);
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
} finally {
unlockWrite(stamp);
}
}
public void forEach(EntryProcessor<V> processor) {
long stamp = tryOptimisticRead();
int capacity = this.capacity;
long[] keys = this.keys;
V[] values = this.values;
boolean acquiredReadLock = false;
try {
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
capacity = this.capacity;
keys = this.keys;
values = this.values;
}
// Go through all the buckets for this section
for (int bucket = 0; bucket < capacity; bucket++) {
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
storedKey = keys[bucket];
storedValue = values[bucket];
}
if (storedValue != DeletedValue && storedValue != EmptyValue) {
processor.accept(storedKey, storedValue);
}
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);
}
}
}
private void rehash() {
// Expand the hashmap
int newCapacity = capacity * 2;
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];
// Re-hash table
for (int i = 0; i < keys.length; i++) {
long storedKey = keys[i];
V storedValue = values[i];
if (storedValue != EmptyValue && storedValue != DeletedValue) {
insertKeyValueNoLock(newKeys, newValues, storedKey, storedValue);
}
}
capacity = newCapacity;
keys = newKeys;
values = newValues;
usedBuckets = size;
resizeThreshold = (int) (capacity * MapFillFactor);
}
private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
int bucket = (int) hash(key);
while (true) {
bucket = signSafeMod(bucket, keys.length);
V storedValue = values[bucket];
if (storedValue == EmptyValue) {
// The bucket is empty, so we can use it
keys[bucket] = key;
values[bucket] = value;
return;
}
++bucket;
}
}
}
private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
static long hash(long key) {
long hash = key * HashMixer;
hash ^= hash >>> R;
hash *= HashMixer;
return hash;
}
static int signSafeMod(long n, int Max) {
return (int) n & (Max - 1);
}
static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
}

View File

@ -0,0 +1,423 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.StampedLock;
/**
* Concurrent hash set for primitive longs
*
* Provides similar methods as a ConcurrentSet&lt;Long&gt; but since it's an open hash map with linear probing, no node
* allocations are required to store the values.
* <p>
* Items <strong>MUST</strong> be >= 0.
*/
public class ConcurrentLongHashSet {
private static final long EmptyItem = -1L;
private static final long DeletedItem = -2L;
private static final float SetFillFactor = 0.66f;
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
private final Section[] sections;
public interface ConsumerLong {
void accept(long item);
}
public ConcurrentLongHashSet() {
this(DefaultExpectedItems);
}
public ConcurrentLongHashSet(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
public ConcurrentLongHashSet(int expectedItems, final int numSections) {
checkArgument(numSections > 0);
if (expectedItems < numSections) {
expectedItems = numSections;
}
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / SetFillFactor);
this.sections = new Section[numSections];
for (int i = 0; i < numSections; i++) {
sections[i] = new Section(perSectionCapacity);
}
}
public int size() {
int size = 0;
for (Section s : sections) {
size += s.size;
}
return size;
}
public long capacity() {
long capacity = 0;
for (Section s : sections) {
capacity += s.capacity;
}
return capacity;
}
public boolean isEmpty() {
for (Section s : sections) {
if (s.size != 0) {
return false;
}
}
return true;
}
long getUsedBucketCount() {
long usedBucketCount = 0;
for (Section s : sections) {
usedBucketCount += s.usedBuckets;
}
return usedBucketCount;
}
public boolean contains(long item) {
checkBiggerEqualZero(item);
long h = hash(item);
return getSection(h).contains(item, (int) h);
}
public boolean add(long item) {
checkBiggerEqualZero(item);
long h = hash(item);
return getSection(h).add(item, (int) h);
}
/**
* Remove an existing entry if found
*
* @param item
* @return true if removed or false if item was not present
*/
public boolean remove(long item) {
checkBiggerEqualZero(item);
long h = hash(item);
return getSection(h).remove(item, (int) h);
}
private Section getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
return sections[sectionIdx];
}
public void clear() {
for (Section s : sections) {
s.clear();
}
}
public void forEach(ConsumerLong processor) {
for (Section s : sections) {
s.forEach(processor);
}
}
/**
* @return a new list of all keys (makes a copy)
*/
public Set<Long> items() {
Set<Long> items = new HashSet<>();
forEach(items::add);
return items;
}
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
private long[] table;
private int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Section(int capacity) {
this.capacity = alignToPowerOfTwo(capacity);
this.table = new long[this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * SetFillFactor);
Arrays.fill(table, EmptyItem);
}
boolean contains(long item, int hash) {
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
int bucket = signSafeMod(hash, capacity);
try {
while (true) {
// First try optimistic locking
long storedItem = table[bucket];
if (!acquiredLock && validate(stamp)) {
// The values we have read are consistent
if (item == storedItem) {
return true;
} else if (storedItem == EmptyItem) {
// Not found
return false;
}
} else {
// Fallback to acquiring read lock
if (!acquiredLock) {
stamp = readLock();
acquiredLock = true;
bucket = signSafeMod(hash, capacity);
storedItem = table[bucket];
}
if (item == storedItem) {
return true;
} else if (storedItem == EmptyItem) {
// Not found
return false;
}
}
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
if (acquiredLock) {
unlockRead(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
boolean add(long item, long hash) {
long stamp = writeLock();
int bucket = signSafeMod(hash, capacity);
// Remember where we find the first available spot
int firstDeletedItem = -1;
try {
while (true) {
long storedItem = table[bucket];
if (item == storedItem) {
// Item was already in set
return false;
} else if (storedItem == EmptyItem) {
// Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
// key, we should write at that position
if (firstDeletedItem != -1) {
bucket = firstDeletedItem;
} else {
++usedBuckets;
}
table[bucket] = item;
++size;
return true;
} else if (storedItem == DeletedItem) {
// The bucket contained a different deleted key
if (firstDeletedItem == -1) {
firstDeletedItem = bucket;
}
}
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThreshold) {
try {
rehash();
} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
private boolean remove(long item, int hash) {
long stamp = writeLock();
int bucket = signSafeMod(hash, capacity);
try {
while (true) {
long storedItem = table[bucket];
if (item == storedItem) {
--size;
cleanBucket(bucket);
return true;
} else if (storedItem == EmptyItem) {
// Key wasn't found
return false;
}
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
unlockWrite(stamp);
}
}
private void cleanBucket(int bucket) {
int nextInArray = (bucket + 1) & (table.length - 1);
if (table[nextInArray] == EmptyItem) {
table[bucket] = EmptyItem;
--usedBuckets;
} else {
table[bucket] = DeletedItem;
}
}
void clear() {
long stamp = writeLock();
try {
Arrays.fill(table, EmptyItem);
this.size = 0;
this.usedBuckets = 0;
} finally {
unlockWrite(stamp);
}
}
public void forEach(ConsumerLong processor) {
long stamp = tryOptimisticRead();
long[] table = this.table;
boolean acquiredReadLock = false;
try {
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
table = this.table;
}
// Go through all the buckets for this section
for (int bucket = 0; bucket < table.length; bucket++) {
long storedItem = table[bucket];
if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
storedItem = table[bucket];
}
if (storedItem != DeletedItem && storedItem != EmptyItem) {
processor.accept(storedItem);
}
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);
}
}
}
private void rehash() {
// Expand the hashmap
int newCapacity = capacity * 2;
long[] newTable = new long[newCapacity];
Arrays.fill(newTable, EmptyItem);
// Re-hash table
for (int i = 0; i < table.length; i++) {
long storedItem = table[i];
if (storedItem != EmptyItem && storedItem != DeletedItem) {
insertKeyValueNoLock(newTable, newCapacity, storedItem);
}
}
capacity = newCapacity;
table = newTable;
usedBuckets = size;
resizeThreshold = (int) (capacity * SetFillFactor);
}
private static void insertKeyValueNoLock(long[] table, int capacity, long item) {
int bucket = signSafeMod(hash(item), capacity);
while (true) {
long storedKey = table[bucket];
if (storedKey == EmptyItem) {
// The bucket is empty, so we can use it
table[bucket] = item;
return;
}
bucket = (bucket + 1) & (table.length - 1);
}
}
}
private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
static long hash(long key) {
long hash = key * HashMixer;
hash ^= hash >>> R;
hash *= HashMixer;
return hash;
}
static int signSafeMod(long n, int Max) {
return (int) (n & (Max - 1));
}
static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
static void checkBiggerEqualZero(long n) {
if (n < 0L) {
throw new IllegalArgumentException("Keys and values must be >= 0");
}
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
package org.apache.activemq.artemis.utils.collections;
import java.util.Set;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
package org.apache.activemq.artemis.utils.collections;
public interface LinkedList<E> {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
package org.apache.activemq.artemis.utils.collections;
import java.lang.reflect.Array;
import java.util.NoSuchElementException;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
package org.apache.activemq.artemis.utils.collections;
import java.util.Iterator;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
package org.apache.activemq.artemis.utils.collections;
/**
* A type of linked list which maintains items according to a priority

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
package org.apache.activemq.artemis.utils.collections;
import java.lang.reflect.Array;
import java.util.NoSuchElementException;

View File

@ -0,0 +1,939 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN;
import static org.apache.activemq.artemis.utils.DataConstants.BYTE;
import static org.apache.activemq.artemis.utils.DataConstants.BYTES;
import static org.apache.activemq.artemis.utils.DataConstants.CHAR;
import static org.apache.activemq.artemis.utils.DataConstants.DOUBLE;
import static org.apache.activemq.artemis.utils.DataConstants.FLOAT;
import static org.apache.activemq.artemis.utils.DataConstants.INT;
import static org.apache.activemq.artemis.utils.DataConstants.LONG;
import static org.apache.activemq.artemis.utils.DataConstants.NULL;
import static org.apache.activemq.artemis.utils.DataConstants.SHORT;
import static org.apache.activemq.artemis.utils.DataConstants.STRING;
/**
* Property Value Conversion.
* <p>
* This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification
* (Version 1.1 April 12, 2002).
* <p>
*/
public class TypedProperties {
private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_");
private Map<SimpleString, PropertyValue> properties;
private volatile int size;
private boolean internalProperties;
public TypedProperties() {
}
/**
* Return the number of properites
* */
public int size() {
return properties.size();
}
public int getMemoryOffset() {
// The estimate is basically the encode size + 2 object references for each entry in the map
// Note we don't include the attributes or anything else since they already included in the memory estimate
// of the ServerMessage
return properties == null ? 0 : size + 2 * DataConstants.SIZE_INT * properties.size();
}
public TypedProperties(final TypedProperties other) {
properties = other.properties == null ? null : new HashMap<>(other.properties);
size = other.size;
}
public boolean hasInternalProperties() {
return internalProperties;
}
public void putBooleanProperty(final SimpleString key, final boolean value) {
checkCreateProperties();
doPutValue(key, new BooleanValue(value));
}
public void putByteProperty(final SimpleString key, final byte value) {
checkCreateProperties();
doPutValue(key, new ByteValue(value));
}
public void putBytesProperty(final SimpleString key, final byte[] value) {
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new BytesValue(value));
}
public void putShortProperty(final SimpleString key, final short value) {
checkCreateProperties();
doPutValue(key, new ShortValue(value));
}
public void putIntProperty(final SimpleString key, final int value) {
checkCreateProperties();
doPutValue(key, new IntValue(value));
}
public void putLongProperty(final SimpleString key, final long value) {
checkCreateProperties();
doPutValue(key, new LongValue(value));
}
public void putFloatProperty(final SimpleString key, final float value) {
checkCreateProperties();
doPutValue(key, new FloatValue(value));
}
public void putDoubleProperty(final SimpleString key, final double value) {
checkCreateProperties();
doPutValue(key, new DoubleValue(value));
}
public void putSimpleStringProperty(final SimpleString key, final SimpleString value) {
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new StringValue(value));
}
public void putNullValue(final SimpleString key) {
checkCreateProperties();
doPutValue(key, new NullValue());
}
public void putCharProperty(final SimpleString key, final char value) {
checkCreateProperties();
doPutValue(key, new CharValue(value));
}
public void putTypedProperties(final TypedProperties otherProps) {
if (otherProps == null || otherProps.properties == null) {
return;
}
checkCreateProperties();
Set<Entry<SimpleString, PropertyValue>> otherEntries = otherProps.properties.entrySet();
for (Entry<SimpleString, PropertyValue> otherEntry : otherEntries) {
doPutValue(otherEntry.getKey(), otherEntry.getValue());
}
}
public Object getProperty(final SimpleString key) {
return doGetProperty(key);
}
public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Boolean.valueOf(null);
} else if (value instanceof Boolean) {
return (Boolean) value;
} else if (value instanceof SimpleString) {
return Boolean.valueOf(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Byte.valueOf(null);
} else if (value instanceof Byte) {
return (Byte) value;
} else if (value instanceof SimpleString) {
return Byte.parseByte(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Character getCharProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
throw new NullPointerException("Invalid conversion: " + key);
}
if (value instanceof Character) {
return ((Character) value);
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return null;
} else if (value instanceof byte[]) {
return (byte[]) value;
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Double.valueOf(null);
} else if (value instanceof Float) {
return ((Float) value).doubleValue();
} else if (value instanceof Double) {
return (Double) value;
} else if (value instanceof SimpleString) {
return Double.parseDouble(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Integer.valueOf(null);
} else if (value instanceof Integer) {
return (Integer) value;
} else if (value instanceof Byte) {
return ((Byte) value).intValue();
} else if (value instanceof Short) {
return ((Short) value).intValue();
} else if (value instanceof SimpleString) {
return Integer.parseInt(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Long.valueOf(null);
} else if (value instanceof Long) {
return (Long) value;
} else if (value instanceof Byte) {
return ((Byte) value).longValue();
} else if (value instanceof Short) {
return ((Short) value).longValue();
} else if (value instanceof Integer) {
return ((Integer) value).longValue();
} else if (value instanceof SimpleString) {
return Long.parseLong(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return Short.valueOf(null);
} else if (value instanceof Byte) {
return ((Byte) value).shortValue();
} else if (value instanceof Short) {
return (Short) value;
} else if (value instanceof SimpleString) {
return Short.parseShort(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null)
return Float.valueOf(null);
if (value instanceof Float) {
return ((Float) value);
}
if (value instanceof SimpleString) {
return Float.parseFloat(((SimpleString) value).toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
Object value = doGetProperty(key);
if (value == null) {
return null;
}
if (value instanceof SimpleString) {
return (SimpleString) value;
} else if (value instanceof Boolean) {
return new SimpleString(value.toString());
} else if (value instanceof Character) {
return new SimpleString(value.toString());
} else if (value instanceof Byte) {
return new SimpleString(value.toString());
} else if (value instanceof Short) {
return new SimpleString(value.toString());
} else if (value instanceof Integer) {
return new SimpleString(value.toString());
} else if (value instanceof Long) {
return new SimpleString(value.toString());
} else if (value instanceof Float) {
return new SimpleString(value.toString());
} else if (value instanceof Double) {
return new SimpleString(value.toString());
}
throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
}
public Object removeProperty(final SimpleString key) {
return doRemoveProperty(key);
}
public boolean containsProperty(final SimpleString key) {
if (size == 0) {
return false;
} else {
return properties.containsKey(key);
}
}
public Set<SimpleString> getPropertyNames() {
if (size == 0) {
return Collections.emptySet();
} else {
return properties.keySet();
}
}
public synchronized void decode(final ByteBuf buffer) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
properties = null;
} else {
int numHeaders = buffer.readInt();
properties = new HashMap<>(numHeaders);
size = 0;
for (int i = 0; i < numHeaders; i++) {
int len = buffer.readInt();
byte[] data = new byte[len];
buffer.readBytes(data);
SimpleString key = new SimpleString(data);
byte type = buffer.readByte();
PropertyValue val;
switch (type) {
case NULL: {
val = new NullValue();
doPutValue(key, val);
break;
}
case CHAR: {
val = new CharValue(buffer);
doPutValue(key, val);
break;
}
case BOOLEAN: {
val = new BooleanValue(buffer);
doPutValue(key, val);
break;
}
case BYTE: {
val = new ByteValue(buffer);
doPutValue(key, val);
break;
}
case BYTES: {
val = new BytesValue(buffer);
doPutValue(key, val);
break;
}
case SHORT: {
val = new ShortValue(buffer);
doPutValue(key, val);
break;
}
case INT: {
val = new IntValue(buffer);
doPutValue(key, val);
break;
}
case LONG: {
val = new LongValue(buffer);
doPutValue(key, val);
break;
}
case FLOAT: {
val = new FloatValue(buffer);
doPutValue(key, val);
break;
}
case DOUBLE: {
val = new DoubleValue(buffer);
doPutValue(key, val);
break;
}
case STRING: {
val = new StringValue(buffer);
doPutValue(key, val);
break;
}
default: {
throw ActiveMQUtilBundle.BUNDLE.invalidType(type);
}
}
}
}
}
public synchronized void encode(final ByteBuf buffer) {
if (properties == null) {
buffer.writeByte(DataConstants.NULL);
} else {
buffer.writeByte(DataConstants.NOT_NULL);
buffer.writeInt(properties.size());
for (Map.Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
SimpleString s = entry.getKey();
byte[] data = s.getData();
buffer.writeInt(data.length);
buffer.writeBytes(data);
entry.getValue().write(buffer);
}
}
}
public int getEncodeSize() {
if (properties == null) {
return DataConstants.SIZE_BYTE;
} else {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
}
}
public void clear() {
if (properties != null) {
properties.clear();
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("TypedProperties[");
if (properties != null) {
Iterator<Entry<SimpleString, PropertyValue>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Entry<SimpleString, PropertyValue> iterItem = iter.next();
sb.append(iterItem.getKey() + "=");
// it seems weird but it's right!!
// The first getValue is from the EntrySet
// The second is to convert the PropertyValue into the actual value
Object theValue = iterItem.getValue().getValue();
if (theValue == null) {
sb.append("NULL-value");
} else if (theValue instanceof byte[]) {
sb.append("[" + ByteUtil.maxString(ByteUtil.bytesToHex((byte[]) theValue, 2), 150) + ")");
if (iterItem.getKey().toString().startsWith("_AMQ_ROUTE_TO")) {
sb.append(",bytesAsLongs(");
try {
ByteBuffer buff = ByteBuffer.wrap((byte[]) theValue);
while (buff.hasRemaining()) {
long bindingID = buff.getLong();
sb.append(bindingID);
if (buff.hasRemaining()) {
sb.append(",");
}
}
} catch (Throwable e) {
sb.append("error-converting-longs=" + e.getMessage());
}
sb.append("]");
}
} else {
sb.append(theValue.toString());
}
if (iter.hasNext()) {
sb.append(",");
}
}
}
return sb.append("]").toString();
}
// Private ------------------------------------------------------------------------------------
private void checkCreateProperties() {
if (properties == null) {
properties = new HashMap<>();
}
}
private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
if (key.startsWith(AMQ_PROPNAME)) {
internalProperties = true;
}
PropertyValue oldValue = properties.put(key, value);
if (oldValue != null) {
size += value.encodeSize() - oldValue.encodeSize();
} else {
size += SimpleString.sizeofString(key) + value.encodeSize();
}
}
private synchronized Object doRemoveProperty(final SimpleString key) {
if (properties == null) {
return null;
}
PropertyValue val = properties.remove(key);
if (val == null) {
return null;
} else {
size -= SimpleString.sizeofString(key) + val.encodeSize();
return val.getValue();
}
}
private synchronized Object doGetProperty(final Object key) {
if (size == 0) {
return null;
}
PropertyValue val = properties.get(key);
if (val == null) {
return null;
} else {
return val.getValue();
}
}
// Inner classes ------------------------------------------------------------------------------
private abstract static class PropertyValue {
abstract Object getValue();
abstract void write(ByteBuf buffer);
abstract int encodeSize();
@Override
public String toString() {
return "" + getValue();
}
}
private static final class NullValue extends PropertyValue {
private NullValue() {
}
@Override
public Object getValue() {
return null;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.NULL);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE;
}
}
private static final class BooleanValue extends PropertyValue {
final boolean val;
private BooleanValue(final boolean val) {
this.val = val;
}
private BooleanValue(final ByteBuf buffer) {
val = buffer.readBoolean();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BOOLEAN);
buffer.writeBoolean(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN;
}
}
private static final class ByteValue extends PropertyValue {
final byte val;
private ByteValue(final byte val) {
this.val = val;
}
private ByteValue(final ByteBuf buffer) {
val = buffer.readByte();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BYTE);
buffer.writeByte(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_BYTE;
}
}
private static final class BytesValue extends PropertyValue {
final byte[] val;
private BytesValue(final byte[] val) {
this.val = val;
}
private BytesValue(final ByteBuf buffer) {
int len = buffer.readInt();
val = new byte[len];
buffer.readBytes(val);
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BYTES);
buffer.writeInt(val.length);
buffer.writeBytes(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + val.length;
}
}
private static final class ShortValue extends PropertyValue {
final short val;
private ShortValue(final short val) {
this.val = val;
}
private ShortValue(final ByteBuf buffer) {
val = buffer.readShort();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.SHORT);
buffer.writeShort(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_SHORT;
}
}
private static final class IntValue extends PropertyValue {
final int val;
private IntValue(final int val) {
this.val = val;
}
private IntValue(final ByteBuf buffer) {
val = buffer.readInt();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.INT);
buffer.writeInt(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT;
}
}
private static final class LongValue extends PropertyValue {
final long val;
private LongValue(final long val) {
this.val = val;
}
private LongValue(final ByteBuf buffer) {
val = buffer.readLong();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.LONG);
buffer.writeLong(val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
}
}
private static final class FloatValue extends PropertyValue {
final float val;
private FloatValue(final float val) {
this.val = val;
}
private FloatValue(final ByteBuf buffer) {
val = Float.intBitsToFloat(buffer.readInt());
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.FLOAT);
buffer.writeInt(Float.floatToIntBits(val));
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_FLOAT;
}
}
private static final class DoubleValue extends PropertyValue {
final double val;
private DoubleValue(final double val) {
this.val = val;
}
private DoubleValue(final ByteBuf buffer) {
val = Double.longBitsToDouble(buffer.readLong());
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.DOUBLE);
buffer.writeLong(Double.doubleToLongBits(val));
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_DOUBLE;
}
}
private static final class CharValue extends PropertyValue {
final char val;
private CharValue(final char val) {
this.val = val;
}
private CharValue(final ByteBuf buffer) {
val = (char) buffer.readShort();
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.CHAR);
buffer.writeShort((short) val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_CHAR;
}
}
private static final class StringValue extends PropertyValue {
final SimpleString val;
private StringValue(final SimpleString val) {
this.val = val;
}
private StringValue(final ByteBuf buffer) {
val = SimpleString.readSimpleString(buffer);
}
@Override
public Object getValue() {
return val;
}
@Override
public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.STRING);
SimpleString.writeSimpleString(buffer, val);
}
@Override
public int encodeSize() {
return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
}
}
public boolean isEmpty() {
return properties.isEmpty();
}
public Map<String, Object> getMap() {
Map<String, Object> m = new HashMap<>();
for (Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
Object val = entry.getValue().getValue();
if (val instanceof SimpleString) {
m.put(entry.getKey().toString(), ((SimpleString) val).toString());
} else {
m.put(entry.getKey().toString(), val);
}
}
return m;
}
/**
* Helper for MapMessage#setObjectProperty(String, Object)
*
* @param key The SimpleString key
* @param value The Object value
* @param properties The typed properties
*/
public static void setObjectProperty(final SimpleString key, final Object value, final TypedProperties properties) {
if (value == null) {
properties.putNullValue(key);
} else if (value instanceof Boolean) {
properties.putBooleanProperty(key, (Boolean) value);
} else if (value instanceof Byte) {
properties.putByteProperty(key, (Byte) value);
} else if (value instanceof Character) {
properties.putCharProperty(key, (Character) value);
} else if (value instanceof Short) {
properties.putShortProperty(key, (Short) value);
} else if (value instanceof Integer) {
properties.putIntProperty(key, (Integer) value);
} else if (value instanceof Long) {
properties.putLongProperty(key, (Long) value);
} else if (value instanceof Float) {
properties.putFloatProperty(key, (Float) value);
} else if (value instanceof Double) {
properties.putDoubleProperty(key, (Double) value);
} else if (value instanceof String) {
properties.putSimpleStringProperty(key, new SimpleString((String) value));
} else if (value instanceof SimpleString) {
properties.putSimpleStringProperty(key, (SimpleString) value);
} else if (value instanceof byte[]) {
properties.putBytesProperty(key, (byte[]) value);
} else {
throw new ActiveMQPropertyConversionException(value.getClass() + " is not a valid property type");
}
}
}

View File

@ -23,7 +23,7 @@ import java.lang.reflect.Method;
import java.util.Locale;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.commons.beanutils.FluentPropertyBeanIntrospector;
import org.apache.commons.beanutils.IntrospectionContext;
import org.jboss.logging.Logger;

View File

@ -18,6 +18,8 @@ package org.apache.activemq.artemis.utils;
import java.util.Iterator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -21,6 +21,7 @@ import java.util.Iterator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -0,0 +1,405 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongFunction;
import com.google.common.collect.Lists;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
public class ConcurrentLongHashMapTest {
@Test
public void simpleInsertions() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
assertFalse(map.isEmpty());
assertNull(map.put(2, "two"));
assertNull(map.put(3, "three"));
assertEquals(map.size(), 3);
assertEquals(map.get(1), "one");
assertEquals(map.size(), 3);
assertEquals(map.remove(1), "one");
assertEquals(map.size(), 2);
assertEquals(map.get(1), null);
assertEquals(map.get(5), null);
assertEquals(map.size(), 2);
assertNull(map.put(1, "one"));
assertEquals(map.size(), 3);
assertEquals(map.put(1, "uno"), "one");
assertEquals(map.size(), 3);
}
@Test
public void testRemove() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
assertFalse(map.isEmpty());
assertFalse(map.remove(0, "zero"));
assertFalse(map.remove(1, "uno"));
assertFalse(map.isEmpty());
assertTrue(map.remove(1, "one"));
assertTrue(map.isEmpty());
}
@Test
public void testNegativeUsedBucketCount() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
map.put(0, "zero");
assertEquals(1, map.getUsedBucketCount());
map.put(0, "zero1");
assertEquals(1, map.getUsedBucketCount());
map.remove(0);
assertEquals(0, map.getUsedBucketCount());
map.remove(0);
assertEquals(0, map.getUsedBucketCount());
}
@Test
public void testRehashing() {
int n = 16;
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
for (int i = 0; i < n; i++) {
map.put(i, i);
}
assertEquals(map.capacity(), 2 * n);
assertEquals(map.size(), n);
}
@Test
public void testRehashingWithDeletes() {
int n = 16;
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
for (int i = 0; i < n / 2; i++) {
map.put(i, i);
}
for (int i = 0; i < n / 2; i++) {
map.remove(i);
}
for (int i = n; i < (2 * n); i++) {
map.put(i, i);
}
assertEquals(map.capacity(), 2 * n);
assertEquals(map.size(), n);
}
@Test
public void concurrentInsertions() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
String value = "value";
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = random.nextLong();
// Ensure keys are uniques
key -= key % (threadIdx + 1);
map.put(key, value);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(map.size(), N * nThreads);
executor.shutdown();
}
@Test
public void concurrentInsertionsAndReads() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
String value = "value";
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = random.nextLong();
// Ensure keys are uniques
key -= key % (threadIdx + 1);
map.put(key, value);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(map.size(), N * nThreads);
executor.shutdown();
}
@Test
public void testIteration() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
map.put(0, "zero");
assertEquals(map.keys(), Lists.newArrayList(0L));
assertEquals(map.values(), Lists.newArrayList("zero"));
map.remove(0);
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
map.put(0, "zero");
map.put(1, "one");
map.put(2, "two");
List<Long> keys = map.keys();
Collections.sort(keys);
assertEquals(keys, Lists.newArrayList(0L, 1L, 2L));
List<String> values = map.values();
Collections.sort(values);
assertEquals(values, Lists.newArrayList("one", "two", "zero"));
map.put(1, "uno");
keys = map.keys();
Collections.sort(keys);
assertEquals(keys, Lists.newArrayList(0L, 1L, 2L));
values = map.values();
Collections.sort(values);
assertEquals(values, Lists.newArrayList("two", "uno", "zero"));
map.clear();
assertTrue(map.isEmpty());
}
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
// Pick 2 keys that fall into the same bucket
long key1 = 1;
long key2 = 27;
int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets);
int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets);
assertEquals(bucket1, bucket2);
assertEquals(map.put(key1, "value-1"), null);
assertEquals(map.put(key2, "value-2"), null);
assertEquals(map.size(), 2);
assertEquals(map.remove(key1), "value-1");
assertEquals(map.size(), 1);
assertEquals(map.put(key1, "value-1-overwrite"), null);
assertEquals(map.size(), 2);
assertEquals(map.remove(key1), "value-1-overwrite");
assertEquals(map.size(), 1);
assertEquals(map.put(key2, "value-2-overwrite"), "value-2");
assertEquals(map.get(key2), "value-2-overwrite");
assertEquals(map.size(), 1);
assertEquals(map.remove(key2), "value-2-overwrite");
assertTrue(map.isEmpty());
}
@Test
public void testPutIfAbsent() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
assertEquals(map.putIfAbsent(1, "one"), null);
assertEquals(map.get(1), "one");
assertEquals(map.putIfAbsent(1, "uno"), "one");
assertEquals(map.get(1), "one");
}
@Test
public void testComputeIfAbsent() {
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
AtomicInteger counter = new AtomicInteger();
LongFunction<Integer> provider = key -> counter.getAndIncrement();
assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
assertEquals(map.get(0).intValue(), 0);
assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
assertEquals(map.get(1).intValue(), 1);
assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
assertEquals(map.get(1).intValue(), 1);
assertEquals(map.computeIfAbsent(2, provider).intValue(), 2);
assertEquals(map.get(2).intValue(), 2);
}
int Iterations = 1;
int ReadIterations = 100;
int N = 1_000_000;
public void benchConcurrentLongHashMap() throws Exception {
// public static void main(String args[]) {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
map.put(i, "value");
}
for (long h = 0; h < ReadIterations; h++) {
for (int j = 0; j < N; j++) {
map.get(i);
}
}
for (int j = 0; j < N; j++) {
map.remove(i);
}
}
}
public void benchConcurrentHashMap() throws Exception {
ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<>(N, 0.66f, 1);
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
map.put(i, "value");
}
for (long h = 0; h < ReadIterations; h++) {
for (int j = 0; j < N; j++) {
map.get(i);
}
}
for (int j = 0; j < N; j++) {
map.remove(i);
}
}
}
void benchHashMap() throws Exception {
HashMap<Long, String> map = new HashMap<>(N, 0.66f);
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
map.put(i, "value");
}
for (long h = 0; h < ReadIterations; h++) {
for (int j = 0; j < N; j++) {
map.get(i);
}
}
for (int j = 0; j < N; j++) {
map.remove(i);
}
}
}
public static void main(String[] args) throws Exception {
ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest();
long start = System.nanoTime();
// t.benchHashMap();
long end = System.nanoTime();
System.out.println("HM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
start = System.nanoTime();
t.benchConcurrentHashMap();
end = System.nanoTime();
System.out.println("CHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
start = System.nanoTime();
// t.benchConcurrentLongHashMap();
end = System.nanoTime();
System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
}
}

View File

@ -0,0 +1,249 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ConcurrentLongHashSetTest {
@Test
public void simpleInsertions() {
ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
assertTrue(set.isEmpty());
assertTrue(set.add(1));
assertFalse(set.isEmpty());
assertTrue(set.add(2));
assertTrue(set.add(3));
assertEquals(set.size(), 3);
assertTrue(set.contains(1));
assertEquals(set.size(), 3);
assertTrue(set.remove(1));
assertEquals(set.size(), 2);
assertFalse(set.contains(1));
assertFalse(set.contains(5));
assertEquals(set.size(), 2);
assertTrue(set.add(1));
assertEquals(set.size(), 3);
assertFalse(set.add(1));
assertEquals(set.size(), 3);
}
@Test
public void testRemove() {
ConcurrentLongHashSet set = new ConcurrentLongHashSet();
assertTrue(set.isEmpty());
assertTrue(set.add(1));
assertFalse(set.isEmpty());
assertFalse(set.remove(0));
assertFalse(set.isEmpty());
assertTrue(set.remove(1));
assertTrue(set.isEmpty());
}
@Test
public void testRehashing() {
int n = 16;
ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
for (int i = 0; i < n; i++) {
set.add(i);
}
assertEquals(set.capacity(), 2 * n);
assertEquals(set.size(), n);
}
@Test
public void testRehashingWithDeletes() {
int n = 16;
ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
for (int i = 0; i < n / 2; i++) {
set.add(i);
}
for (int i = 0; i < n / 2; i++) {
set.remove(i);
}
for (int i = n; i < (2 * n); i++) {
set.add(i);
}
assertEquals(set.capacity(), 2 * n);
assertEquals(set.size(), n);
}
@Test
public void concurrentInsertions() throws Throwable {
ConcurrentLongHashSet set = new ConcurrentLongHashSet();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = Math.abs(random.nextLong());
// Ensure keys are unique
key -= key % (threadIdx + 1);
set.add(key);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(set.size(), N * nThreads);
executor.shutdown();
}
@Test
public void concurrentInsertionsAndReads() throws Throwable {
ConcurrentLongHashSet map = new ConcurrentLongHashSet();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = Math.abs(random.nextLong());
// Ensure keys are unique
key -= key % (threadIdx + 1);
map.add(key);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(map.size(), N * nThreads);
executor.shutdown();
}
@Test
public void testIteration() {
ConcurrentLongHashSet set = new ConcurrentLongHashSet();
assertEquals(set.items(), Collections.emptySet());
set.add(0L);
assertEquals(set.items(), Sets.newHashSet(0L));
set.remove(0L);
assertEquals(set.items(), Collections.emptySet());
set.add(0L);
set.add(1L);
set.add(2L);
List<Long> values = Lists.newArrayList(set.items());
Collections.sort(values);
assertEquals(values, Lists.newArrayList(0L, 1L, 2L));
set.clear();
assertTrue(set.isEmpty());
}
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1);
// Pick 2 keys that fall into the same bucket
long key1 = 1;
long key2 = 27;
int bucket1 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key1), Buckets);
int bucket2 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key2), Buckets);
assertEquals(bucket1, bucket2);
assertTrue(set.add(key1));
assertTrue(set.add(key2));
assertEquals(set.size(), 2);
assertTrue(set.remove(key1));
assertEquals(set.size(), 1);
assertTrue(set.add(key1));
assertEquals(set.size(), 2);
assertTrue(set.remove(key1));
assertEquals(set.size(), 1);
assertFalse(set.add(key2));
assertTrue(set.contains(key2));
assertEquals(set.size(), 1);
assertTrue(set.remove(key2));
assertTrue(set.isEmpty());
}
}

View File

@ -40,10 +40,10 @@ import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.PriorityLinkedList;
import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.jboss.logging.Logger;
public final class ClientConsumerImpl implements ClientConsumerInternal {

View File

@ -31,8 +31,8 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
/**
* A ClientMessageImpl

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
public interface ClientMessageInternal extends ClientMessage {

View File

@ -62,11 +62,11 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

View File

@ -35,7 +35,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -39,8 +39,8 @@ import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.core.server.management;
import org.apache.activemq.artemis.api.core.management.NotificationType;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
/**
* A Notification

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.reader;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
public class MapMessageUtil extends MessageUtil {

View File

@ -19,8 +19,8 @@ package org.apache.activemq.artemis.util;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.junit.Assert;
import org.junit.Test;

View File

@ -34,7 +34,7 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {

View File

@ -53,9 +53,9 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.version.Version;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* ActiveMQ Artemis implementation of a JMS Connection.

View File

@ -39,7 +39,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
/**
* NOTE: this class forwards {@link #setDisableMessageID(boolean)} and

View File

@ -30,7 +30,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap;
import static org.apache.activemq.artemis.reader.MapMessageUtil.writeBodyMap;

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jms.client;
import javax.jms.IllegalStateException;
import java.util.Set;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* Restricts what can be called on context passed in wrapped CompletionListener.

View File

@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.AddressControl;
@ -57,7 +58,6 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.Notification;
@ -93,8 +93,8 @@ import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.XMLUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -30,7 +29,7 @@ import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
/**
* Super class for Journal maintenances such as clean up and Compactor
@ -56,7 +55,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
private ActiveMQBuffer writingChannel;
private final Set<Long> recordsSnapshot = new ConcurrentHashSet<>();
private final ConcurrentLongHashSet recordsSnapshot;
protected final List<JournalFile> newDataFiles = new ArrayList<>();
@ -67,14 +66,14 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final ConcurrentLongHashSet recordsSnapshot,
final long nextOrderingID) {
super();
this.journal = journal;
this.filesRepository = filesRepository;
this.fileFactory = fileFactory;
this.nextOrderingID = nextOrderingID;
this.recordsSnapshot.addAll(recordsSnapshot);
this.recordsSnapshot = recordsSnapshot;
}
// Public --------------------------------------------------------

View File

@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@ -43,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
/**
* Journal used at a replicating backup server during the synchronization of data with the 'live'
@ -54,7 +53,7 @@ public final class FileWrapperJournal extends JournalBase {
private final ReentrantLock lockAppend = new ReentrantLock();
private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<>();
private final ConcurrentLongHashMap<AtomicInteger> transactions = new ConcurrentLongHashMap<>();
private final JournalImpl journal;
protected volatile JournalFile currentFile;
@ -181,7 +180,7 @@ public final class FileWrapperJournal extends JournalBase {
IOCompletion callback,
boolean lineUpContext) throws Exception {
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
AtomicInteger value = transactions.remove(Long.valueOf(txID));
AtomicInteger value = transactions.remove(txID);
if (value != null) {
commitRecord.setNumberOfRecords(value.get());
}
@ -195,7 +194,7 @@ public final class FileWrapperJournal extends JournalBase {
boolean sync,
IOCompletion callback) throws Exception {
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
AtomicInteger value = transactions.get(Long.valueOf(txID));
AtomicInteger value = transactions.get(txID);
if (value != null) {
prepareRecord.setNumberOfRecords(value.get());
}
@ -204,7 +203,7 @@ public final class FileWrapperJournal extends JournalBase {
private int count(long txID) throws ActiveMQException {
AtomicInteger defaultValue = new AtomicInteger(1);
AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue);
AtomicInteger count = transactions.putIfAbsent(txID, defaultValue);
if (count != null) {
return count.incrementAndGet();
}
@ -219,7 +218,7 @@ public final class FileWrapperJournal extends JournalBase {
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
AtomicInteger value = transactions.remove(Long.valueOf(txID));
AtomicInteger value = transactions.remove(txID);
if (value != null) {
rollbackRecord.setNumberOfRecords(value.get());
}

View File

@ -18,12 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -41,6 +37,8 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
import org.jboss.logging.Logger;
public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider {
@ -53,11 +51,11 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
private static final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<>();
private final ConcurrentLongHashMap<PendingTransaction> pendingTransactions = new ConcurrentLongHashMap<>();
private final Map<Long, JournalRecord> newRecords = new HashMap<>();
private final ConcurrentLongHashMap<JournalRecord> newRecords = new ConcurrentLongHashMap<>();
private final Map<Long, JournalTransaction> newTransactions = new HashMap<>();
private final ConcurrentLongHashMap<JournalTransaction> newTransactions = new ConcurrentLongHashMap<>();
/**
* Commands that happened during compacting
@ -120,18 +118,18 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
return newDataFiles;
}
public Map<Long, JournalRecord> getNewRecords() {
public ConcurrentLongHashMap<JournalRecord> getNewRecords() {
return newRecords;
}
public Map<Long, JournalTransaction> getNewTransactions() {
public ConcurrentLongHashMap<JournalTransaction> getNewTransactions() {
return newTransactions;
}
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final ConcurrentLongHashSet recordsSnapshot,
final long firstFileID) {
super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
}
@ -628,7 +626,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
}
@Override
public Map<Long, JournalRecord> getRecords() {
public ConcurrentLongHashMap<JournalRecord> getRecords() {
return newRecords;
}

View File

@ -31,8 +31,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@ -57,7 +55,6 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TestableJournal;
@ -70,15 +67,18 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
import org.jboss.logging.Logger;
/**
@ -168,12 +168,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final JournalFilesRepository filesRepository;
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
private final ConcurrentLongHashMap<JournalRecord> records = new ConcurrentLongHashMap<>();
private final Set<Long> pendingRecords = new ConcurrentHashSet<>();
private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet();
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>();
private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
// This will be set only while the JournalCompactor is being executed
private volatile JournalCompactor compactor;
@ -345,7 +345,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
@Override
public Map<Long, JournalRecord> getRecords() {
public ConcurrentLongHashMap<JournalRecord> getRecords() {
return records;
}
@ -1487,12 +1487,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return;
}
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet()) {
compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
entry.getValue().setCompacting();
}
transactions.forEach((id, pendingTransaction) -> {
compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray());
pendingTransaction.setCompacting();
});
// We will calculate the new records during compacting, what will take the position the records will take
// after compacting
@ -1540,9 +1540,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
newDatafiles = localCompactor.getNewDataFiles();
// Restore newRecords created during compacting
for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet()) {
records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
}
localCompactor.getNewRecords().forEach((id, newRecord) -> {
records.put(id, newRecord);
});
// Restore compacted dataFiles
for (int i = newDatafiles.size() - 1; i >= 0; i--) {
@ -1559,9 +1559,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Replay pending commands (including updates, deletes and commits)
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) {
newTransaction.replaceRecordProvider(this);
}
localCompactor.getNewTransactions().forEach((id, newTransaction) -> newTransaction.replaceRecordProvider(this));
localCompactor.replayPendingCommands();
@ -1569,7 +1567,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// This has to be done after the replay pending commands, as we need to delete commits
// that happened during the compacting
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) {
localCompactor.getNewTransactions().forEach((id, newTransaction) -> {
if (logger.isTraceEnabled()) {
logger.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
}
@ -1579,7 +1577,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else {
ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
}
}
});
} finally {
journalLock.writeLock().unlock();
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.artemis.core.journal.impl;
import java.util.Map;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
/**
* This is an interface used only internally.
@ -29,5 +29,5 @@ public interface JournalRecordProvider {
JournalCompactor getCompactor();
Map<Long, JournalRecord> getRecords();
ConcurrentLongHashMap<JournalRecord> getRecords();
}

View File

@ -28,7 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap;
import static org.apache.activemq.artemis.reader.MapMessageUtil.writeBodyMap;

View File

@ -28,8 +28,8 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* MQTTConnectionMananager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these

View File

@ -25,7 +25,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public class MQTTRetainMessageManager {

View File

@ -77,8 +77,8 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;

View File

@ -44,8 +44,8 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;

View File

@ -22,7 +22,7 @@ import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* The connection manager used in non-managed environments.

View File

@ -25,7 +25,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRALogger;
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistryImpl;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
public final class RecoveryManager {

View File

@ -48,6 +48,7 @@ import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
@ -84,7 +85,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -107,7 +107,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.ListUtil;
import org.apache.activemq.artemis.utils.SecurityFormatter;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
public class ActiveMQServerControlImpl extends AbstractControl implements ActiveMQServerControl, NotificationEmitter, org.apache.activemq.artemis.core.server.management.NotificationListener {
// Constants -----------------------------------------------------

View File

@ -59,7 +59,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public class QueueControlImpl extends AbstractControl implements QueueControl {

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public interface PageIterator extends LinkedListIterator<PagedReference> {

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public interface PageSubscription {

View File

@ -55,8 +55,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
final class PageSubscriptionImpl implements PageSubscription {

View File

@ -35,8 +35,8 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
public final class Page implements Comparable<Page> {

View File

@ -37,7 +37,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
public final class PagingManagerImpl implements PagingManager {

View File

@ -30,7 +30,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage {

View File

@ -82,8 +82,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -39,7 +39,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleLi
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
public final class InVMAcceptor extends AbstractAcceptor {

View File

@ -82,7 +82,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -371,7 +371,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* @throws Exception
*/
private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
Long id = Long.valueOf(msg.getId());
long id = msg.getId();
byte[] data = msg.getData();
SequentialFile channel1;
switch (msg.getFileType()) {

View File

@ -36,8 +36,8 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeLis
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager2;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -29,8 +29,8 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public interface Queue extends Bindable {

View File

@ -62,9 +62,9 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
/**

View File

@ -61,8 +61,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -34,8 +34,8 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -52,7 +53,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
@ -70,7 +70,7 @@ import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
public final class ClusterConnectionImpl implements ClusterConnection, AfterConnectInternalListener {

View File

@ -30,7 +30,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.UnproposalListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
public abstract class GroupHandlingAbstract implements GroupingHandler {

View File

@ -40,7 +40,7 @@ import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -34,9 +34,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
/**
* A remote Grouping handler.

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import javax.management.MBeanServer;
import javax.security.cert.X509Certificate;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
@ -46,9 +48,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.security.cert.X509Certificate;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.Pair;
@ -165,13 +164,13 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.SecurityFormatter;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
/**

View File

@ -84,15 +84,15 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.PriorityLinkedList;
import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -60,7 +60,7 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
public class ScaleDownHandler {

View File

@ -59,8 +59,8 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObjectBuilder;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -29,11 +31,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObjectBuilder;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -89,9 +86,11 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
/**
* Server side Session implementation
*/

View File

@ -86,8 +86,8 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
public class ManagementServiceImpl implements ManagementService {

View File

@ -16,8 +16,8 @@
*/
package org.apache.activemq.artemis.core.list;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -58,8 +58,8 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.junit.Assert;
import org.junit.Test;

View File

@ -47,10 +47,10 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Test;

View File

@ -91,7 +91,7 @@ under the License.
so methods are passed null arguments to trigger the exact same exceptions.
-->
<Class name="org.apache.activemq.artemis.utils.TypedProperties"/>
<Class name="org.apache.activemq.artemis.utils.collections.TypedProperties"/>
<Or>
<Method name="getFloatProperty"/>
<Method name="getDoubleProperty"/>

View File

@ -59,7 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -39,14 +40,13 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;

View File

@ -38,7 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -68,7 +68,6 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
@ -78,9 +77,9 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -27,7 +27,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

View File

@ -45,7 +45,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;

View File

@ -40,7 +40,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -16,6 +16,22 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
@ -39,22 +55,6 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.junit.Before;
import org.junit.Test;
public class MqttPluginTest extends MQTTTestSupport {

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.stress.journal;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@ -238,12 +237,15 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase {
reloadJournal();
Collection<Long> records = journal.getRecords().keySet();
System.out.println("Deleting everything!");
for (Long delInfo : records) {
journal.appendDeleteRecord(delInfo, false);
}
journal.getRecords().forEach((id, record) -> {
try {
journal.appendDeleteRecord(id, false);
} catch (Exception e) {
new RuntimeException(e);
}
});
journal.forceMoveNextFile();

View File

@ -48,8 +48,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -22,8 +22,8 @@ import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@ -31,11 +31,10 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public class FakeQueue implements Queue {

View File

@ -49,7 +49,7 @@ import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOff
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

View File

@ -23,8 +23,8 @@ import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.LinkedListImpl;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Before;
import org.junit.Test;