Fix concurrency of ComplexMetrics.java (#9134)

This commit is contained in:
Lucas Capistrant 2020-01-15 08:19:45 -06:00 committed by Roman Leventov
parent b2877119d0
commit 4716e0b585
1 changed files with 29 additions and 14 deletions

View File

@ -22,14 +22,14 @@ package org.apache.druid.segment.serde;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
/** /**
* ComplexMetrics houses a mapping of serde names to affiliated ComplexMetricSerde objects.
*/ */
public class ComplexMetrics public class ComplexMetrics
{ {
private static final Map<String, ComplexMetricSerde> COMPLEX_SERIALIZERS = new HashMap<>(); private static final ConcurrentHashMap<String, ComplexMetricSerde> COMPLEX_SERIALIZERS = new ConcurrentHashMap<>();
@Nullable @Nullable
public static ComplexMetricSerde getSerdeForType(String type) public static ComplexMetricSerde getSerdeForType(String type)
@ -37,19 +37,34 @@ public class ComplexMetrics
return COMPLEX_SERIALIZERS.get(type); return COMPLEX_SERIALIZERS.get(type);
} }
/**
* Register a serde name -> ComplexMetricSerde mapping.
*
* <p>
* If the specified serde key string is already used and the supplied ComplexMetricSerde is not of the same
* type as the existing value in the map for said key, an ISE is thrown.
* </p>
*
* @param type The serde name used as the key in the map.
* @param serde The ComplexMetricSerde object to be associated with the 'type' in the map.
*/
public static void registerSerde(String type, ComplexMetricSerde serde) public static void registerSerde(String type, ComplexMetricSerde serde)
{ {
if (COMPLEX_SERIALIZERS.containsKey(type)) { COMPLEX_SERIALIZERS.compute(type, (key, value) -> {
if (!COMPLEX_SERIALIZERS.get(type).getClass().getName().equals(serde.getClass().getName())) { if (value == null) {
throw new ISE( return serde;
"Incompatible serializer for type[%s] already exists. Expected [%s], found [%s].", } else {
type, if (!value.getClass().getName().equals(serde.getClass().getName())) {
serde.getClass().getName(), throw new ISE(
COMPLEX_SERIALIZERS.get(type).getClass().getName() "Incompatible serializer for type[%s] already exists. Expected [%s], found [%s].",
); key,
serde.getClass().getName(),
value.getClass().getName()
);
} else {
return value;
}
} }
} else { });
COMPLEX_SERIALIZERS.put(type, serde);
}
} }
} }