Fix #3795 (Java 7 compatibility). (#3796)

* Fix #3795 (Java 7 compatibility).

Also introduce Animal Sniffer checks during build, which would
have caught the original problems.

* Add Animal Sniffer on caffeine-cache for JDK8.
This commit is contained in:
Gian Merlino 2016-12-21 13:19:13 -05:00 committed by Fangjin Yang
parent c0c34f82ad
commit 6440ddcbca
7 changed files with 114 additions and 17 deletions

View File

@ -34,11 +34,11 @@ import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.common.parsers.ParseException;
@ -202,7 +202,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
for (Map.Entry<String, Set<MessageQueue>> entry : topicQueueMap.entrySet()) { for (Map.Entry<String, Set<MessageQueue>> entry : topicQueueMap.entrySet()) {
for (MessageQueue messageQueue : entry.getValue()) { for (MessageQueue messageQueue : entry.getValue()) {
if (messageQueueTreeSetMap.keySet().contains(messageQueue) if (JavaCompatUtils.keySet(messageQueueTreeSetMap).contains(messageQueue)
&& !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) {
hasMore = true; hasMore = true;
} else { } else {
@ -251,7 +251,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
MessageExt message = entry.getValue().pollFirst(); MessageExt message = entry.getValue().pollFirst();
InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody())); InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody()));
if (!windows.keySet().contains(entry.getKey())) { if (!JavaCompatUtils.keySet(windows).contains(entry.getKey())) {
windows.put(entry.getKey(), new ConcurrentSkipListSet<Long>()); windows.put(entry.getKey(), new ConcurrentSkipListSet<Long>());
} }
windows.get(entry.getKey()).add(message.getQueueOffset()); windows.get(entry.getKey()).add(message.getQueueOffset());
@ -434,7 +434,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
switch (pullResult.getPullStatus()) { switch (pullResult.getPullStatus()) {
case FOUND: case FOUND:
// Handle pull result. // Handle pull result.
if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { if (!JavaCompatUtils.keySet(messageQueueTreeSetMap).contains(pullRequest.getMessageQueue())) {
messageQueueTreeSetMap.putIfAbsent( messageQueueTreeSetMap.putIfAbsent(
pullRequest.getMessageQueue(), pullRequest.getMessageQueue(),
new ConcurrentSkipListSet<>(new MessageComparator()) new ConcurrentSkipListSet<>(new MessageComparator())

View File

@ -75,6 +75,28 @@
<target>1.8</target> <target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<executions>
<!-- Override Animal Sniffer configuration for the JDK8 target. -->
<execution>
<id>check-java-api</id>
<phase>test</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<signature>
<groupId>org.codehaus.mojo.signature
</groupId>
<artifactId>java18</artifactId>
<version>1.0</version>
</signature>
</configuration>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -64,6 +64,7 @@ import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.Supervisor; import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorReport; import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.metadata.EntryExistsException; import io.druid.metadata.EntryExistsException;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -79,6 +80,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -134,6 +136,11 @@ public class KafkaSupervisor implements Supervisor
this.partitionOffsets = partitionOffsets; this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime; this.minimumMessageTime = minimumMessageTime;
} }
Set<String> taskIds()
{
return JavaCompatUtils.keySet(tasks);
}
} }
private class TaskData private class TaskData
@ -951,9 +958,9 @@ public class KafkaSupervisor implements Supervisor
log.warn( log.warn(
"All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]", "All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]",
groupId, groupId,
group.tasks.keySet() group.taskIds()
); );
for (String id : group.tasks.keySet()) { for (String id : group.taskIds()) {
killTask(id); killTask(id);
} }
} }
@ -1003,7 +1010,7 @@ public class KafkaSupervisor implements Supervisor
// 2) Pause running tasks // 2) Pause running tasks
final List<ListenableFuture<Map<Integer, Long>>> pauseFutures = Lists.newArrayList(); final List<ListenableFuture<Map<Integer, Long>>> pauseFutures = Lists.newArrayList();
final List<String> pauseTaskIds = ImmutableList.copyOf(taskGroup.tasks.keySet()); final List<String> pauseTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
for (final String taskId : pauseTaskIds) { for (final String taskId : pauseTaskIds) {
pauseFutures.add(taskClient.pauseAsync(taskId)); pauseFutures.add(taskClient.pauseAsync(taskId));
} }
@ -1039,7 +1046,7 @@ public class KafkaSupervisor implements Supervisor
// 4) Set the end offsets for each task to the values from step 3 and resume the tasks. All the tasks should // 4) Set the end offsets for each task to the values from step 3 and resume the tasks. All the tasks should
// finish reading and start publishing within a short period, depending on how in sync the tasks were. // finish reading and start publishing within a short period, depending on how in sync the tasks were.
final List<ListenableFuture<Boolean>> setEndOffsetFutures = Lists.newArrayList(); final List<ListenableFuture<Boolean>> setEndOffsetFutures = Lists.newArrayList();
final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.tasks.keySet()); final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
if (setEndOffsetTaskIds.isEmpty()) { if (setEndOffsetTaskIds.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
@ -1124,7 +1131,7 @@ public class KafkaSupervisor implements Supervisor
if (task.getValue().status.isSuccess()) { if (task.getValue().status.isSuccess()) {
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
// we no longer need them to publish their segment. // we no longer need them to publish their segment.
log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.tasks.keySet()); log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
futures.add(stopTasksInGroup(group)); futures.add(stopTasksInGroup(group));
foundSuccess = true; foundSuccess = true;
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
@ -1138,7 +1145,7 @@ public class KafkaSupervisor implements Supervisor
} else { } else {
log.makeAlert( log.makeAlert(
"No task in [%s] succeeded before the completion timeout elapsed [%s]!", "No task in [%s] succeeded before the completion timeout elapsed [%s]!",
group.tasks.keySet(), group.taskIds(),
ioConfig.getCompletionTimeout() ioConfig.getCompletionTimeout()
).emit(); ).emit();
} }
@ -1181,7 +1188,7 @@ public class KafkaSupervisor implements Supervisor
// 2) Remove any tasks that have failed from the list // 2) Remove any tasks that have failed from the list
// 3) If any task completed successfully, stop all the tasks in this group and move to the next group // 3) If any task completed successfully, stop all the tasks in this group and move to the next group
log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.tasks.keySet()); log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds());
Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator(); Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
while (iTasks.hasNext()) { while (iTasks.hasNext()) {
@ -1211,7 +1218,7 @@ public class KafkaSupervisor implements Supervisor
break; break;
} }
} }
log.debug("Task group [%d] post-pruning: %s", groupId, taskGroup.tasks.keySet()); log.debug("Task group [%d] post-pruning: %s", groupId, taskGroup.taskIds());
} }
// wait for all task shutdowns to complete before returning // wait for all task shutdowns to complete before returning
@ -1221,9 +1228,13 @@ public class KafkaSupervisor implements Supervisor
void createNewTasks() void createNewTasks()
{ {
// check that there is a current task group for each group of partitions in [partitionGroups] // check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) { for (Integer groupId : JavaCompatUtils.keySet(partitionGroups)) {
if (!taskGroups.containsKey(groupId)) { if (!taskGroups.containsKey(groupId)) {
log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet()); log.info(
"Creating new task group [%d] for partitions %s",
groupId,
JavaCompatUtils.keySet(partitionGroups.get(groupId))
);
Optional<DateTime> minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of( Optional<DateTime> minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get()) DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get())

View File

@ -26,6 +26,7 @@ import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.metadata.MetadataSupervisorManager; import io.druid.metadata.MetadataSupervisorManager;
@ -56,7 +57,7 @@ public class SupervisorManager
public Set<String> getSupervisorIds() public Set<String> getSupervisorIds()
{ {
return supervisors.keySet(); return JavaCompatUtils.keySet(supervisors);
} }
public Optional<SupervisorSpec> getSupervisorSpec(String id) public Optional<SupervisorSpec> getSupervisorSpec(String id)
@ -114,7 +115,7 @@ public class SupervisorManager
Preconditions.checkState(started, "SupervisorManager not started"); Preconditions.checkState(started, "SupervisorManager not started");
synchronized (lock) { synchronized (lock) {
for (String id : supervisors.keySet()) { for (String id : JavaCompatUtils.keySet(supervisors)) {
try { try {
supervisors.get(id).lhs.stop(false); supervisors.get(id).lhs.stop(false);
} }

View File

@ -0,0 +1,35 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common.collect;
import java.util.Map;
import java.util.Set;
public class JavaCompatUtils
{
/**
* Equivalent to theMap.keySet(), but works around a Java 7 compat issue. See also
* https://github.com/druid-io/druid/issues/3795.
*/
public static <K, V> Set<K> keySet(Map<K, V> theMap)
{
return theMap.keySet();
}
}

28
pom.xml
View File

@ -764,6 +764,34 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<version>1.15</version>
<executions>
<execution>
<id>check-java-api</id>
<phase>test</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<signature>
<groupId>org.codehaus.mojo.signature
</groupId>
<artifactId>java17</artifactId>
<version>1.0</version>
</signature>
<ignores>
<!-- Some of our code uses DirectBuffer & Cleaner directly, which are not part of
the JDK signature (although they are there anyway). -->
<ignore>sun.nio.ch.DirectBuffer</ignore>
<ignore>sun.misc.Cleaner</ignore>
</ignores>
</configuration>
</execution>
</executions>
</plugin>
</plugins> </plugins>
<pluginManagement> <pluginManagement>
<plugins> <plugins>

View File

@ -179,7 +179,7 @@ public class CoordinatorRuleManager
) )
); );
log.info("Got [%,d] rules", newRules.keySet().size()); log.info("Got [%,d] rules", newRules.size());
rules.set(newRules); rules.set(newRules);
} }