diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java index 1ad98d1b63d..fdf8c8ea83f 100644 --- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java +++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java @@ -18,7 +18,7 @@ package org.apache.solr.prometheus.collector; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -29,12 +29,12 @@ import org.apache.solr.prometheus.scraper.SolrScraper; public class MetricsCollectorFactory { private final MetricsConfiguration metricsConfiguration; - private final Executor executor; + private final ExecutorService executor; private final int refreshInSeconds; private final SolrScraper solrScraper; public MetricsCollectorFactory( - Executor executor, + ExecutorService executor, int refreshInSeconds, SolrScraper solrScraper, MetricsConfiguration metricsConfiguration) { diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java index 53b0aa1c6de..62763dfd73c 100644 --- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java +++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java @@ -19,20 +19,20 @@ package org.apache.solr.prometheus.collector; import java.io.Closeable; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import io.prometheus.client.Collector; import io.prometheus.client.Histogram; import org.apache.solr.prometheus.exporter.SolrExporter; -import org.apache.solr.prometheus.scraper.Async; import org.apache.solr.common.util.SolrNamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +53,7 @@ public class SchedulerMetricsCollector implements Closeable { 1, new SolrNamedThreadFactory("scheduled-metrics-collector")); - private final Executor executor; + private final ExecutorService executor; private final List observers = new CopyOnWriteArrayList<>(); @@ -63,7 +63,7 @@ public class SchedulerMetricsCollector implements Closeable { .register(SolrExporter.defaultRegistry); public SchedulerMetricsCollector( - Executor executor, + ExecutorService executor, int duration, TimeUnit timeUnit, List metricCollectors) { @@ -83,31 +83,27 @@ public class SchedulerMetricsCollector implements Closeable { try (Histogram.Timer timer = metricsCollectionTime.startTimer()) { log.info("Beginning metrics collection"); - List> futures = new ArrayList<>(); - - for (MetricCollector metricsCollector : metricCollectors) { - futures.add(CompletableFuture.supplyAsync(() -> { - try { - return metricsCollector.collect(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, executor)); + final List> futures = executor.invokeAll( + metricCollectors.stream() + .map(metricCollector -> (Callable) metricCollector::collect) + .collect(Collectors.toList()) + ); + MetricSamples metricSamples = new MetricSamples(); + for (Future future : futures) { + try { + metricSamples.addAll(future.get()); + } catch (ExecutionException e) { + log.error("Error occurred during metrics collection", e.getCause());//logok + // continue any ways; do not fail + } } - try { - CompletableFuture> sampleFuture = Async.waitForAllSuccessfulResponses(futures); - List samples = sampleFuture.get(); + notifyObservers(metricSamples.asList()); - MetricSamples metricSamples = new MetricSamples(); - samples.forEach(metricSamples::addAll); - - notifyObservers(metricSamples.asList()); - - log.info("Completed metrics collection"); - } catch (InterruptedException | ExecutionException e) { - log.error("Error while waiting for metric collection to complete", e); - } + log.info("Completed metrics collection"); + } catch (InterruptedException e) { + log.warn("Interrupted waiting for metric collection to complete", e); + Thread.currentThread().interrupt(); } } diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java deleted file mode 100644 index 2b8c763e3fe..00000000000 --- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.prometheus.scraper; - -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Async { - - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - @SuppressWarnings({"rawtypes"}) - public static CompletableFuture> waitForAllSuccessfulResponses(List> futures) { - CompletableFuture completed = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - - return completed.thenApply(values -> { - return futures.stream() - .map(CompletableFuture::join) - .collect(Collectors.toList()); - } - ).exceptionally(error -> { - futures.stream() - .filter(CompletableFuture::isCompletedExceptionally) - .forEach(future -> { - try { - future.get(); - } catch (Exception exception) { - log.warn("Error occurred during metrics collection", exception); - } - }); - - return futures.stream() - .filter(future -> !(future.isCompletedExceptionally() || future.isCancelled())) - .map(CompletableFuture::join) - .collect(Collectors.toList()); - } - ); - } - - -} diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java index 896ea276402..e4b98e75811 100644 --- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java +++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.stream.Collectors; @@ -44,7 +44,7 @@ public class SolrCloudScraper extends SolrScraper { private Cache hostClientCache = CacheBuilder.newBuilder().build(); - public SolrCloudScraper(CloudSolrClient solrClient, Executor executor, SolrClientFactory solrClientFactory) { + public SolrCloudScraper(CloudSolrClient solrClient, ExecutorService executor, SolrClientFactory solrClientFactory) { super(executor); this.solrClient = solrClient; this.solrClientFactory = solrClientFactory; diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java index bbbfc200616..c1ee6aacb38 100644 --- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java +++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java @@ -21,12 +21,11 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.stream.Collectors; @@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.util.NamedList; -import org.apache.solr.common.util.Pair; import org.apache.solr.prometheus.collector.MetricSamples; import org.apache.solr.prometheus.exporter.MetricsQuery; import org.apache.solr.prometheus.exporter.SolrExporter; @@ -59,7 +57,7 @@ public abstract class SolrScraper implements Closeable { protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - protected final Executor executor; + protected final ExecutorService executor; public abstract Map metricsForAllHosts(MetricsQuery query) throws IOException; @@ -69,7 +67,7 @@ public abstract class SolrScraper implements Closeable { public abstract MetricSamples search(MetricsQuery query) throws IOException; public abstract MetricSamples collections(MetricsQuery metricsQuery) throws IOException; - public SolrScraper(Executor executor) { + public SolrScraper(ExecutorService executor) { this.executor = executor; } @@ -77,17 +75,32 @@ public abstract class SolrScraper implements Closeable { Collection items, Function samplesCallable) throws IOException { - List>> futures = items.stream() - .map(item -> CompletableFuture.supplyAsync(() -> new Pair<>(item, samplesCallable.apply(item)), executor)) - .collect(Collectors.toList()); - - Future>> allComplete = Async.waitForAllSuccessfulResponses(futures); + Map result = new HashMap<>(); // sync on this when adding to it below try { - return allComplete.get().stream().collect(Collectors.toMap(Pair::first, Pair::second)); - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); + // invoke each samplesCallable with each item and putting the results in the above "result" map. + executor.invokeAll( + items.stream() + .map(item -> (Callable) () -> { + try { + final MetricSamples samples = samplesCallable.apply(item); + synchronized (result) { + result.put(item, samples); + } + } catch (Exception e) { + // do NOT totally fail; just log and move on + log.warn("Error occurred during metrics collection", e); + } + return null;//not used + }) + .collect(Collectors.toList()) + ); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } + + return result; } protected MetricSamples request(SolrClient client, MetricsQuery query) throws IOException { diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java index 8c1ee78d1ca..4bd8370cf5b 100644 --- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java +++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java @@ -22,7 +22,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import com.fasterxml.jackson.databind.JsonNode; import org.apache.solr.client.solrj.SolrServerException; @@ -38,7 +38,7 @@ public class SolrStandaloneScraper extends SolrScraper { private final HttpSolrClient solrClient; - public SolrStandaloneScraper(HttpSolrClient solrClient, Executor executor) { + public SolrStandaloneScraper(HttpSolrClient solrClient, ExecutorService executor) { super(executor); this.solrClient = solrClient; } diff --git a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java b/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java deleted file mode 100644 index 0959bd4348d..00000000000 --- a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.prometheus.scraper; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class AsyncTest { - - private CompletableFuture failedFuture() { - CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(new RuntimeException("Some error")); - return result; - } - - @Test - public void getAllResults() throws Exception { - List expectedValues = Arrays.asList(1, 2, 3, 4, 5, 6, 7); - - CompletableFuture> results = Async.waitForAllSuccessfulResponses( - expectedValues.stream() - .map(CompletableFuture::completedFuture) - .collect(Collectors.toList())); - - List actualValues = results.get(); - - Collections.sort(expectedValues); - Collections.sort(actualValues); - - assertEquals(expectedValues, actualValues); - } - - @Test - public void ignoresFailures() throws Exception { - CompletableFuture> results = Async.waitForAllSuccessfulResponses(Arrays.asList( - CompletableFuture.completedFuture(1), - failedFuture() - )); - - List values = results.get(); - - assertEquals(Collections.singletonList(1), values); - } - - @Test - public void allFuturesFail() throws Exception { - CompletableFuture> results = Async.waitForAllSuccessfulResponses(Collections.singletonList( - failedFuture() - )); - - List values = results.get(); - - assertTrue(values.isEmpty()); - } -} \ No newline at end of file