diff --git a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index db9b9accfa5..94b7bb79cde 100644 --- a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -34,11 +34,11 @@ import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; - import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; +import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; @@ -202,7 +202,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory> entry : topicQueueMap.entrySet()) { for (MessageQueue messageQueue : entry.getValue()) { - if (messageQueueTreeSetMap.keySet().contains(messageQueue) + if (JavaCompatUtils.keySet(messageQueueTreeSetMap).contains(messageQueue) && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { hasMore = true; } else { @@ -251,7 +251,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory()); } windows.get(entry.getKey()).add(message.getQueueOffset()); @@ -434,7 +434,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory(new MessageComparator()) diff --git a/extensions-core/caffeine-cache/pom.xml b/extensions-core/caffeine-cache/pom.xml index 58259ccf3dd..f7542162406 100644 --- a/extensions-core/caffeine-cache/pom.xml +++ b/extensions-core/caffeine-cache/pom.xml @@ -75,6 +75,28 @@ 1.8 + + org.codehaus.mojo + animal-sniffer-maven-plugin + + + + check-java-api + test + + check + + + + org.codehaus.mojo.signature + + java18 + 1.0 + + + + + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 3abf26abad3..3769fe10966 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -64,6 +64,7 @@ import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.supervisor.Supervisor; import io.druid.indexing.overlord.supervisor.SupervisorReport; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.metadata.EntryExistsException; import org.apache.commons.codec.digest.DigestUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -79,6 +80,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -134,6 +136,11 @@ public class KafkaSupervisor implements Supervisor this.partitionOffsets = partitionOffsets; this.minimumMessageTime = minimumMessageTime; } + + Set taskIds() + { + return JavaCompatUtils.keySet(tasks); + } } private class TaskData @@ -951,9 +958,9 @@ public class KafkaSupervisor implements Supervisor log.warn( "All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]", groupId, - group.tasks.keySet() + group.taskIds() ); - for (String id : group.tasks.keySet()) { + for (String id : group.taskIds()) { killTask(id); } } @@ -1003,7 +1010,7 @@ public class KafkaSupervisor implements Supervisor // 2) Pause running tasks final List>> pauseFutures = Lists.newArrayList(); - final List pauseTaskIds = ImmutableList.copyOf(taskGroup.tasks.keySet()); + final List pauseTaskIds = ImmutableList.copyOf(taskGroup.taskIds()); for (final String taskId : pauseTaskIds) { pauseFutures.add(taskClient.pauseAsync(taskId)); } @@ -1039,7 +1046,7 @@ public class KafkaSupervisor implements Supervisor // 4) Set the end offsets for each task to the values from step 3 and resume the tasks. All the tasks should // finish reading and start publishing within a short period, depending on how in sync the tasks were. final List> setEndOffsetFutures = Lists.newArrayList(); - final List setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.tasks.keySet()); + final List setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds()); if (setEndOffsetTaskIds.isEmpty()) { log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); @@ -1124,7 +1131,7 @@ public class KafkaSupervisor implements Supervisor if (task.getValue().status.isSuccess()) { // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as // we no longer need them to publish their segment. - log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.tasks.keySet()); + log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds()); futures.add(stopTasksInGroup(group)); foundSuccess = true; toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups @@ -1138,7 +1145,7 @@ public class KafkaSupervisor implements Supervisor } else { log.makeAlert( "No task in [%s] succeeded before the completion timeout elapsed [%s]!", - group.tasks.keySet(), + group.taskIds(), ioConfig.getCompletionTimeout() ).emit(); } @@ -1181,7 +1188,7 @@ public class KafkaSupervisor implements Supervisor // 2) Remove any tasks that have failed from the list // 3) If any task completed successfully, stop all the tasks in this group and move to the next group - log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.tasks.keySet()); + log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds()); Iterator> iTasks = taskGroup.tasks.entrySet().iterator(); while (iTasks.hasNext()) { @@ -1211,7 +1218,7 @@ public class KafkaSupervisor implements Supervisor break; } } - log.debug("Task group [%d] post-pruning: %s", groupId, taskGroup.tasks.keySet()); + log.debug("Task group [%d] post-pruning: %s", groupId, taskGroup.taskIds()); } // wait for all task shutdowns to complete before returning @@ -1221,9 +1228,13 @@ public class KafkaSupervisor implements Supervisor void createNewTasks() { // check that there is a current task group for each group of partitions in [partitionGroups] - for (Integer groupId : partitionGroups.keySet()) { + for (Integer groupId : JavaCompatUtils.keySet(partitionGroups)) { if (!taskGroups.containsKey(groupId)) { - log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet()); + log.info( + "Creating new task group [%d] for partitions %s", + groupId, + JavaCompatUtils.keySet(partitionGroups.get(groupId)) + ); Optional minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of( DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get()) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 37e201a0696..505eec19a23 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -26,6 +26,7 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.metadata.MetadataSupervisorManager; @@ -56,7 +57,7 @@ public class SupervisorManager public Set getSupervisorIds() { - return supervisors.keySet(); + return JavaCompatUtils.keySet(supervisors); } public Optional getSupervisorSpec(String id) @@ -114,7 +115,7 @@ public class SupervisorManager Preconditions.checkState(started, "SupervisorManager not started"); synchronized (lock) { - for (String id : supervisors.keySet()) { + for (String id : JavaCompatUtils.keySet(supervisors)) { try { supervisors.get(id).lhs.stop(false); } diff --git a/java-util/src/main/java/io/druid/java/util/common/collect/JavaCompatUtils.java b/java-util/src/main/java/io/druid/java/util/common/collect/JavaCompatUtils.java new file mode 100644 index 00000000000..95a307bee67 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/collect/JavaCompatUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.collect; + +import java.util.Map; +import java.util.Set; + +public class JavaCompatUtils +{ + /** + * Equivalent to theMap.keySet(), but works around a Java 7 compat issue. See also + * https://github.com/druid-io/druid/issues/3795. + */ + public static Set keySet(Map theMap) + { + return theMap.keySet(); + } +} diff --git a/pom.xml b/pom.xml index 029ef5fb03b..fdddcf19357 100644 --- a/pom.xml +++ b/pom.xml @@ -764,6 +764,34 @@ + + org.codehaus.mojo + animal-sniffer-maven-plugin + 1.15 + + + check-java-api + test + + check + + + + org.codehaus.mojo.signature + + java17 + 1.0 + + + + sun.nio.ch.DirectBuffer + sun.misc.Cleaner + + + + + diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index cdcaf49cf34..66bcdb0d2e5 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -179,7 +179,7 @@ public class CoordinatorRuleManager ) ); - log.info("Got [%,d] rules", newRules.keySet().size()); + log.info("Got [%,d] rules", newRules.size()); rules.set(newRules); }