SOLR prometheus: simplify concurrent collection (#1723)

No semantic difference in behavior.
This commit is contained in:
David Smiley 2020-08-14 15:59:40 -04:00 committed by GitHub
parent ec1c5cfffe
commit e6a11f8c3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 189 deletions

View File

@ -18,7 +18,7 @@
package org.apache.solr.prometheus.collector;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -29,12 +29,12 @@ import org.apache.solr.prometheus.scraper.SolrScraper;
public class MetricsCollectorFactory {
private final MetricsConfiguration metricsConfiguration;
private final Executor executor;
private final ExecutorService executor;
private final int refreshInSeconds;
private final SolrScraper solrScraper;
public MetricsCollectorFactory(
Executor executor,
ExecutorService executor,
int refreshInSeconds,
SolrScraper solrScraper,
MetricsConfiguration metricsConfiguration) {

View File

@ -19,20 +19,20 @@ package org.apache.solr.prometheus.collector;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import io.prometheus.client.Collector;
import io.prometheus.client.Histogram;
import org.apache.solr.prometheus.exporter.SolrExporter;
import org.apache.solr.prometheus.scraper.Async;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,7 +53,7 @@ public class SchedulerMetricsCollector implements Closeable {
1,
new SolrNamedThreadFactory("scheduled-metrics-collector"));
private final Executor executor;
private final ExecutorService executor;
private final List<Observer> observers = new CopyOnWriteArrayList<>();
@ -63,7 +63,7 @@ public class SchedulerMetricsCollector implements Closeable {
.register(SolrExporter.defaultRegistry);
public SchedulerMetricsCollector(
Executor executor,
ExecutorService executor,
int duration,
TimeUnit timeUnit,
List<MetricCollector> metricCollectors) {
@ -83,31 +83,27 @@ public class SchedulerMetricsCollector implements Closeable {
try (Histogram.Timer timer = metricsCollectionTime.startTimer()) {
log.info("Beginning metrics collection");
List<CompletableFuture<MetricSamples>> futures = new ArrayList<>();
for (MetricCollector metricsCollector : metricCollectors) {
futures.add(CompletableFuture.supplyAsync(() -> {
try {
return metricsCollector.collect();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executor));
}
try {
CompletableFuture<List<MetricSamples>> sampleFuture = Async.waitForAllSuccessfulResponses(futures);
List<MetricSamples> samples = sampleFuture.get();
final List<Future<MetricSamples>> futures = executor.invokeAll(
metricCollectors.stream()
.map(metricCollector -> (Callable<MetricSamples>) metricCollector::collect)
.collect(Collectors.toList())
);
MetricSamples metricSamples = new MetricSamples();
samples.forEach(metricSamples::addAll);
for (Future<MetricSamples> future : futures) {
try {
metricSamples.addAll(future.get());
} catch (ExecutionException e) {
log.error("Error occurred during metrics collection", e.getCause());//logok
// continue any ways; do not fail
}
}
notifyObservers(metricSamples.asList());
log.info("Completed metrics collection");
} catch (InterruptedException | ExecutionException e) {
log.error("Error while waiting for metric collection to complete", e);
}
} catch (InterruptedException e) {
log.warn("Interrupted waiting for metric collection to complete", e);
Thread.currentThread().interrupt();
}
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.prometheus.scraper;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Async {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@SuppressWarnings({"rawtypes"})
public static <T> CompletableFuture<List<T>> waitForAllSuccessfulResponses(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> completed = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return completed.thenApply(values -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
).exceptionally(error -> {
futures.stream()
.filter(CompletableFuture::isCompletedExceptionally)
.forEach(future -> {
try {
future.get();
} catch (Exception exception) {
log.warn("Error occurred during metrics collection", exception);
}
});
return futures.stream()
.filter(future -> !(future.isCompletedExceptionally() || future.isCancelled()))
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
);
}
}

View File

@ -21,7 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -44,7 +44,7 @@ public class SolrCloudScraper extends SolrScraper {
private Cache<String, HttpSolrClient> hostClientCache = CacheBuilder.newBuilder().build();
public SolrCloudScraper(CloudSolrClient solrClient, Executor executor, SolrClientFactory solrClientFactory) {
public SolrCloudScraper(CloudSolrClient solrClient, ExecutorService executor, SolrClientFactory solrClientFactory) {
super(executor);
this.solrClient = solrClient;
this.solrClientFactory = solrClientFactory;

View File

@ -21,12 +21,11 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import org.apache.solr.prometheus.collector.MetricSamples;
import org.apache.solr.prometheus.exporter.MetricsQuery;
import org.apache.solr.prometheus.exporter.SolrExporter;
@ -59,7 +57,7 @@ public abstract class SolrScraper implements Closeable {
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final Executor executor;
protected final ExecutorService executor;
public abstract Map<String, MetricSamples> metricsForAllHosts(MetricsQuery query) throws IOException;
@ -69,7 +67,7 @@ public abstract class SolrScraper implements Closeable {
public abstract MetricSamples search(MetricsQuery query) throws IOException;
public abstract MetricSamples collections(MetricsQuery metricsQuery) throws IOException;
public SolrScraper(Executor executor) {
public SolrScraper(ExecutorService executor) {
this.executor = executor;
}
@ -77,17 +75,32 @@ public abstract class SolrScraper implements Closeable {
Collection<String> items,
Function<String, MetricSamples> samplesCallable) throws IOException {
List<CompletableFuture<Pair<String, MetricSamples>>> futures = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> new Pair<>(item, samplesCallable.apply(item)), executor))
.collect(Collectors.toList());
Future<List<Pair<String, MetricSamples>>> allComplete = Async.waitForAllSuccessfulResponses(futures);
Map<String, MetricSamples> result = new HashMap<>(); // sync on this when adding to it below
try {
return allComplete.get().stream().collect(Collectors.toMap(Pair::first, Pair::second));
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
// invoke each samplesCallable with each item and putting the results in the above "result" map.
executor.invokeAll(
items.stream()
.map(item -> (Callable<MetricSamples>) () -> {
try {
final MetricSamples samples = samplesCallable.apply(item);
synchronized (result) {
result.put(item, samples);
}
} catch (Exception e) {
// do NOT totally fail; just log and move on
log.warn("Error occurred during metrics collection", e);
}
return null;//not used
})
.collect(Collectors.toList())
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return result;
}
protected MetricSamples request(SolrClient client, MetricsQuery query) throws IOException {

View File

@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.solr.client.solrj.SolrServerException;
@ -38,7 +38,7 @@ public class SolrStandaloneScraper extends SolrScraper {
private final HttpSolrClient solrClient;
public SolrStandaloneScraper(HttpSolrClient solrClient, Executor executor) {
public SolrStandaloneScraper(HttpSolrClient solrClient, ExecutorService executor) {
super(executor);
this.solrClient = solrClient;
}

View File

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.prometheus.scraper;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AsyncTest {
private CompletableFuture<Integer> failedFuture() {
CompletableFuture<Integer> result = new CompletableFuture<>();
result.completeExceptionally(new RuntimeException("Some error"));
return result;
}
@Test
public void getAllResults() throws Exception {
List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(
expectedValues.stream()
.map(CompletableFuture::completedFuture)
.collect(Collectors.toList()));
List<Integer> actualValues = results.get();
Collections.sort(expectedValues);
Collections.sort(actualValues);
assertEquals(expectedValues, actualValues);
}
@Test
public void ignoresFailures() throws Exception {
CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(Arrays.asList(
CompletableFuture.completedFuture(1),
failedFuture()
));
List<Integer> values = results.get();
assertEquals(Collections.singletonList(1), values);
}
@Test
public void allFuturesFail() throws Exception {
CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(Collections.singletonList(
failedFuture()
));
List<Integer> values = results.get();
assertTrue(values.isEmpty());
}
}