From eabb05f7fd52e9e9946160101607052b14b2cfe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 22 Sep 2016 13:37:13 +0200 Subject: [PATCH 1/7] SOLR-8186: Added robustness to the dynamic log muting logic --- .../solr/servlet/SolrDispatchFilter.java | 19 +---- .../solr/servlet/StartupLoggingUtils.java | 85 +++++++++++++++++++ 2 files changed, 86 insertions(+), 18 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/servlet/StartupLoggingUtils.java diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 0c3a2f3e57b..cda315424eb 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -37,7 +37,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Enumeration; import java.util.Locale; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,14 +48,10 @@ import org.apache.commons.io.input.CloseShieldInputStream; import org.apache.commons.io.output.CloseShieldOutputStream; import org.apache.commons.lang.StringUtils; import org.apache.http.client.HttpClient; -import org.apache.log4j.Appender; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.LogManager; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.util.ExecutorUtil; -import org.apache.solr.common.util.SuppressForbidden; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.NodeConfig; import org.apache.solr.core.SolrCore; @@ -124,7 +119,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { { String muteConsole = System.getProperty(SOLR_LOG_MUTECONSOLE); if (muteConsole != null && !Arrays.asList("false","0","off","no").contains(muteConsole.toLowerCase(Locale.ROOT))) { - muteConsole(); + StartupLoggingUtils.muteConsole(); } log.info("SolrDispatchFilter.init(): {}", this.getClass().getClassLoader()); @@ -161,18 +156,6 @@ public class SolrDispatchFilter extends BaseSolrFilter { log.info("SolrDispatchFilter.init() done"); } - @SuppressForbidden(reason = "Legitimate log4j access") - private void muteConsole() { - Enumeration appenders = LogManager.getRootLogger().getAllAppenders(); - while (appenders.hasMoreElements()) { - Appender appender = (Appender) appenders.nextElement(); - if (appender instanceof ConsoleAppender) { - log.info("Property solr.log.muteconsole given. Muting ConsoleAppender named " + appender.getName()); - LogManager.getRootLogger().removeAppender(appender); - } - } - } - /** * Override this to change CoreContainer initialization * @return a CoreContainer to hold this server's cores diff --git a/solr/core/src/java/org/apache/solr/servlet/StartupLoggingUtils.java b/solr/core/src/java/org/apache/solr/servlet/StartupLoggingUtils.java new file mode 100644 index 00000000000..fbcebebc52d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/servlet/StartupLoggingUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.servlet; + +import java.lang.invoke.MethodHandles; +import java.util.Enumeration; + +import org.apache.log4j.Appender; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.LogManager; +import org.apache.solr.common.util.SuppressForbidden; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.impl.StaticLoggerBinder; + +/** + * Handles dynamic modification of during startup, before CoreContainer is created + *

+ * WARNING: This class should only be used during startup. For modifying log levels etc + * during runtime, SLF4J and LogWatcher must be used. + *

+ */ +final class StartupLoggingUtils { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final static StaticLoggerBinder binder = StaticLoggerBinder.getSingleton(); + + /** + * Disables all log4j ConsoleAppender's by modifying log4j configuration dynamically. + * Must only be used during early startup + * @return true if ok or else false if something happened, e.g. log4j classes were not in classpath + */ + @SuppressForbidden(reason = "Legitimate log4j access") + static boolean muteConsole() { + try { + if (!isLog4jActive()) { + logNotSupported("Could not mute logging to console."); + return false; + } + org.apache.log4j.Logger rootLogger = LogManager.getRootLogger(); + Enumeration appenders = rootLogger.getAllAppenders(); + while (appenders.hasMoreElements()) { + Appender appender = (Appender) appenders.nextElement(); + if (appender instanceof ConsoleAppender) { + log.info("Property solr.log.muteconsole given. Muting ConsoleAppender named " + appender.getName()); + rootLogger.removeAppender(appender); + } + } + return true; + } catch (Exception e) { + logNotSupported("Could not mute logging to console."); + return false; + } + } + + private static boolean isLog4jActive() { + try { + // Make sure we have log4j LogManager in classpath + Class.forName("org.apache.log4j.LogManager"); + // Make sure that log4j is really selected as logger in slf4j - we could have LogManager in the bridge class :) + return binder.getLoggerFactoryClassStr().contains("Log4jLoggerFactory"); + } catch (Exception e) { + return false; + } + } + + private static void logNotSupported(String msg) { + log.warn("{} Dynamic log manipulation currently only supported for Log4j. " + + "Please consult your logging framework of choice on how to configure the appropriate logging.", msg); + } +} From 75e69c5198c02e6635eed274b03ea759ef1c4818 Mon Sep 17 00:00:00 2001 From: Joel Date: Thu, 22 Sep 2016 07:58:25 -0400 Subject: [PATCH 2/7] SOLR-9549: Fix bug in advancing docValues --- .../apache/solr/search/TextLogisticRegressionQParserPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/java/org/apache/solr/search/TextLogisticRegressionQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/TextLogisticRegressionQParserPlugin.java index 96f869faa8a..e1d3b7b8479 100644 --- a/solr/core/src/java/org/apache/solr/search/TextLogisticRegressionQParserPlugin.java +++ b/solr/core/src/java/org/apache/solr/search/TextLogisticRegressionQParserPlugin.java @@ -153,7 +153,7 @@ public class TextLogisticRegressionQParserPlugin extends QParserPlugin { public void collect(int doc) throws IOException{ int valuesDocID = leafOutcomeValue.docID(); if (valuesDocID < doc) { - valuesDocID = leafOutcomeValue.advance(valuesDocID); + valuesDocID = leafOutcomeValue.advance(doc); } int outcome; if (valuesDocID == doc) { From a41dd86e3cbb7799c2a6937d7bc0a07ec3c6e1a8 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Thu, 22 Sep 2016 13:21:23 +0100 Subject: [PATCH 3/7] SOLR-9305, SOLR-9390: *really* dont' use directToLeaders --- .../src/test/org/apache/solr/cloud/HttpPartitionTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index faf3309a65e..82fbec0c714 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -87,7 +87,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { */ @Override protected CloudSolrClient createCloudClient(String defaultCollection) { - CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), false); + CloudSolrClient client = new CloudSolrClient.Builder() + .withZkHost(zkServer.getZkAddress()) + .sendDirectUpdatesToAnyShardReplica() + .build(); client.setParallelUpdates(random().nextBoolean()); if (defaultCollection != null) client.setDefaultCollection(defaultCollection); client.getLbClient().setConnectionTimeout(30000); From c55a14e198072c16a834d5b3683c5edaa0c67e5d Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Wed, 21 Sep 2016 18:46:52 +0100 Subject: [PATCH 4/7] SOLR-9544: Give ObjectReleaseTracker more time for async closing objects --- solr/CHANGES.txt | 3 +++ .../solr/util/TestObjectReleaseTracker.java | 12 +++++------ .../common/util/ObjectReleaseTracker.java | 21 +++++++++++++------ .../java/org/apache/solr/SolrTestCaseJ4.java | 2 +- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 99bfabc3306..97eb8bd2895 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -164,6 +164,9 @@ Other Changes * SOLR-5563: Move lots of SolrCloud logging from 'info' to 'debug' (janhoy, Alan Woodward) +* SOLR-9544: Allow ObjectReleaseTracker more time to check for asynchronously + closing resources (Alan Woodward) + ================== 6.2.1 ================== Bug Fixes diff --git a/solr/core/src/test/org/apache/solr/util/TestObjectReleaseTracker.java b/solr/core/src/test/org/apache/solr/util/TestObjectReleaseTracker.java index da50fd8ebe0..f7e69433046 100644 --- a/solr/core/src/test/org/apache/solr/util/TestObjectReleaseTracker.java +++ b/solr/core/src/test/org/apache/solr/util/TestObjectReleaseTracker.java @@ -29,12 +29,12 @@ public class TestObjectReleaseTracker extends LuceneTestCase { public void testObjectReleaseTracker() { ObjectReleaseTracker.track(new Object()); ObjectReleaseTracker.release(new Object()); - assertNotNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty()); - assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty()); + assertNotNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1)); + assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1)); Object obj = new Object(); ObjectReleaseTracker.track(obj); ObjectReleaseTracker.release(obj); - assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty()); + assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1)); Object obj1 = new Object(); ObjectReleaseTracker.track(obj1); @@ -46,7 +46,7 @@ public class TestObjectReleaseTracker extends LuceneTestCase { ObjectReleaseTracker.release(obj1); ObjectReleaseTracker.release(obj2); ObjectReleaseTracker.release(obj3); - assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty()); + assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1)); ObjectReleaseTracker.track(obj1); ObjectReleaseTracker.track(obj2); @@ -55,7 +55,7 @@ public class TestObjectReleaseTracker extends LuceneTestCase { ObjectReleaseTracker.release(obj1); ObjectReleaseTracker.release(obj2); // ObjectReleaseTracker.release(obj3); - assertNotNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty()); - assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty()); + assertNotNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1)); + assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1)); } } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java b/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java index 3fc0546e820..2d7c1a6a580 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +57,18 @@ public class ObjectReleaseTracker { /** * @return null if ok else error message */ - public static String clearObjectTrackerAndCheckEmpty() { - String result = checkEmpty(); + public static String clearObjectTrackerAndCheckEmpty(int waitSeconds) { + int retries = 0; + String result; + do { + result = checkEmpty(); + if (result == null) + break; + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { break; } + } + while (retries++ < waitSeconds); OBJECTS.clear(); @@ -77,11 +88,9 @@ public class ObjectReleaseTracker { objects.add(entry.getKey().getClass().getSimpleName()); } - error = "ObjectTracker found " + entries.size() + " object(s) that were not released!!! " + objects; - - System.err.println(error); + error = "ObjectTracker found " + entries.size() + " object(s) that were not released!!! " + objects + "\n"; for (Entry entry : entries) { - System.err.println(entry.getValue()); + error += entry.getValue() + "\n"; } } diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java index ea70805ab2a..fca0223c0ba 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java @@ -253,7 +253,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase { // if the tests passed, make sure everything was closed / released if (!RandomizedContext.current().getTargetClass().isAnnotationPresent(SuppressObjectReleaseTracker.class)) { endTrackingSearchers(120, false); - String orr = ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(); + String orr = ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(30); assertNull(orr, orr); } else { endTrackingSearchers(15, false); From 7377d0ef9ea8fa9e2aa9a3ccb1249703d8d1d813 Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Thu, 22 Sep 2016 10:02:38 -0400 Subject: [PATCH 5/7] LUCENE-7407: fix stale javadocs --- .../java/org/apache/lucene/index/SortedNumericDocValues.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/SortedNumericDocValues.java b/lucene/core/src/java/org/apache/lucene/index/SortedNumericDocValues.java index 0a888f81900..8c114957956 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SortedNumericDocValues.java +++ b/lucene/core/src/java/org/apache/lucene/index/SortedNumericDocValues.java @@ -32,8 +32,8 @@ public abstract class SortedNumericDocValues extends DocIdSetIterator { protected SortedNumericDocValues() {} /** - * Retrieve the value for the current document at the specified index. - * An index ranges from {@code 0} to {@code count()-1}. + * Iterates to the next value in the current document. Do not call this more than {@link #docValueCount} times + * for the document. */ public abstract long nextValue() throws IOException; From f391d57075ca4bbb5608079bec63d9a6a574308f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 22 Sep 2016 17:03:24 +0200 Subject: [PATCH 6/7] SOLR-6677: Reduced logging during Solr startup, moved more logs to DEBUG level --- solr/CHANGES.txt | 2 ++ .../solr/core/CachingDirectoryFactory.java | 2 +- .../apache/solr/core/ConfigSetProperties.java | 2 +- .../org/apache/solr/core/CoreContainer.java | 14 ++++---- .../solr/core/CorePropertiesLocator.java | 12 ++++--- .../org/apache/solr/core/JmxMonitoredMap.java | 20 +++++------ .../java/org/apache/solr/core/PluginBag.java | 9 +++-- .../apache/solr/core/SchemaCodecFactory.java | 4 +-- .../java/org/apache/solr/core/SolrConfig.java | 10 +++--- .../java/org/apache/solr/core/SolrCore.java | 24 ++++++------- .../apache/solr/core/SolrResourceLoader.java | 35 ++++++++++++++----- .../SolrSnapshotMetaDataManager.java | 2 +- .../component/HttpShardHandlerFactory.java | 2 +- .../apache/solr/handler/loader/XMLLoader.java | 2 +- .../org/apache/solr/logging/LogWatcher.java | 8 ++--- .../org/apache/solr/schema/CurrencyField.java | 2 +- .../org/apache/solr/schema/IndexSchema.java | 11 +++--- .../schema/OpenExchangeRatesOrgProvider.java | 4 +-- .../solr/servlet/SolrDispatchFilter.java | 5 +-- .../solr/update/UpdateShardHandler.java | 2 +- solr/server/resources/log4j.properties | 3 ++ 21 files changed, 106 insertions(+), 69 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 97eb8bd2895..94f30c787d8 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -167,6 +167,8 @@ Other Changes * SOLR-9544: Allow ObjectReleaseTracker more time to check for asynchronously closing resources (Alan Woodward) +* SOLR-6677: Reduced logging during Solr startup, moved more logs to DEBUG level (janhoy, Shawn Heisey) + ================== 6.2.1 ================== Bug Fixes diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java index 5b7ad1b5a53..6a46843c60c 100644 --- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java @@ -350,7 +350,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory { CacheValue newCacheValue = new CacheValue(fullPath, directory); byDirectoryCache.put(directory, newCacheValue); byPathCache.put(fullPath, newCacheValue); - log.info("return new directory for " + fullPath); + log.debug("return new directory for " + fullPath); success = true; } finally { if (!success) { diff --git a/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java b/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java index ca768edc124..004b166f818 100644 --- a/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java +++ b/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java @@ -51,7 +51,7 @@ public class ConfigSetProperties { try { reader = new InputStreamReader(loader.openResource(name), StandardCharsets.UTF_8); } catch (SolrResourceNotFoundException ex) { - log.info("Did not find ConfigSet properties, assuming default properties: " + ex.getMessage()); + log.debug("Did not find ConfigSet properties, assuming default properties: " + ex.getMessage()); return null; } catch (Exception ex) { throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to load reader for ConfigSet properties: " + name, ex); diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 2a7e516cf88..fa8a8c0945e 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -189,7 +189,7 @@ public class CoreContainer { // private ClientConnectionManager clientConnectionManager = new PoolingClientConnectionManager(); { - log.info("New CoreContainer " + System.identityHashCode(this)); + log.debug("New CoreContainer " + System.identityHashCode(this)); } /** @@ -273,7 +273,7 @@ public class CoreContainer { // Read and pass the authorization context to the plugin authorizationPlugin.plugin.init(authorizationConf); } else { - log.info("Security conf doesn't exist. Skipping setup for authorization module."); + log.debug("Security conf doesn't exist. Skipping setup for authorization module."); } this.authorizationPlugin = authorizationPlugin; if (old != null) { @@ -302,7 +302,7 @@ public class CoreContainer { log.info("Authentication plugin class obtained from system property '" + AUTHENTICATION_PLUGIN_PROP + "': " + pluginClassName); } else { - log.info("No authentication plugin used."); + log.debug("No authentication plugin used."); } SecurityPluginHolder old = authenticationPlugin; SecurityPluginHolder authenticationPlugin = null; @@ -335,7 +335,7 @@ public class CoreContainer { // The default http client of the core container's shardHandlerFactory has already been created and // configured using the default httpclient configurer. We need to reconfigure it using the plugin's // http client configurer to set it up for internode communication. - log.info("Reconfiguring HttpClient settings."); + log.debug("Reconfiguring HttpClient settings."); SolrHttpClientContextBuilder httpClientBuilder = new SolrHttpClientContextBuilder(); if (builder.getCredentialsProviderProvider() != null) { @@ -431,7 +431,7 @@ public class CoreContainer { * Load the cores defined for this CoreContainer */ public void load() { - log.info("Loading cores into CoreContainer [instanceDir={}]", loader.getInstancePath()); + log.debug("Loading cores into CoreContainer [instanceDir={}]", loader.getInstancePath()); // add the sharedLib to the shared resource loader before initializing cfg based plugins String libDir = cfg.getSharedLibDirectory(); @@ -742,14 +742,14 @@ public class CoreContainer { coreInitFailures.remove(name); if( old == null || old == core) { - log.info( "registering core: "+name ); + log.debug( "registering core: "+name ); if (registerInZk) { zkSys.registerInZk(core, false); } return null; } else { - log.info( "replacing core: "+name ); + log.debug( "replacing core: "+name ); old.close(); if (registerInZk) { zkSys.registerInZk(core, false); diff --git a/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java b/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java index 30038c5dfca..b37402b97e2 100644 --- a/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java +++ b/solr/core/src/java/org/apache/solr/core/CorePropertiesLocator.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import com.google.common.collect.Lists; import org.apache.solr.common.SolrException; @@ -54,7 +55,7 @@ public class CorePropertiesLocator implements CoresLocator { public CorePropertiesLocator(Path coreDiscoveryRoot) { this.rootDirectory = coreDiscoveryRoot; - logger.info("Config-defined core root directory: {}", this.rootDirectory); + logger.debug("Config-defined core root directory: {}", this.rootDirectory); } @Override @@ -122,7 +123,7 @@ public class CorePropertiesLocator implements CoresLocator { @Override public List discover(final CoreContainer cc) { - logger.info("Looking for core definitions underneath {}", rootDirectory); + logger.debug("Looking for core definitions underneath {}", rootDirectory); final List cds = Lists.newArrayList(); try { Set options = new HashSet<>(); @@ -133,7 +134,7 @@ public class CorePropertiesLocator implements CoresLocator { public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { if (file.getFileName().toString().equals(PROPERTIES_FILENAME)) { CoreDescriptor cd = buildCoreDescriptor(file, cc); - logger.info("Found core {} in {}", cd.getName(), cd.getInstanceDir()); + logger.debug("Found core {} in {}", cd.getName(), cd.getInstanceDir()); cds.add(cd); return FileVisitResult.SKIP_SIBLINGS; } @@ -155,7 +156,10 @@ public class CorePropertiesLocator implements CoresLocator { } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Couldn't walk file tree under " + this.rootDirectory, e); } - logger.info("Found {} core definitions", cds.size()); + logger.info("Found {} core definitions underneath {}", cds.size(), rootDirectory); + if (cds.size() > 0) { + logger.info("Cores are: {}", cds.stream().map(CoreDescriptor::getName).collect(Collectors.toList())); + } return cds; } diff --git a/solr/core/src/java/org/apache/solr/core/JmxMonitoredMap.java b/solr/core/src/java/org/apache/solr/core/JmxMonitoredMap.java index 57bde93decd..a5a27dc24d8 100644 --- a/solr/core/src/java/org/apache/solr/core/JmxMonitoredMap.java +++ b/solr/core/src/java/org/apache/solr/core/JmxMonitoredMap.java @@ -73,7 +73,7 @@ import static org.apache.solr.common.params.CommonParams.NAME; */ public class JmxMonitoredMap extends ConcurrentHashMap { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); // set to true to use cached statistics NamedLists between getMBeanInfo calls to work // around over calling getStatistics on MBeanInfos when iterating over all attributes (SOLR-6586) @@ -108,11 +108,11 @@ public class JmxMonitoredMap extends } if (servers == null || servers.isEmpty()) { - LOG.info("No JMX servers found, not exposing Solr information with JMX."); + log.debug("No JMX servers found, not exposing Solr information with JMX."); return; } server = servers.get(0); - LOG.info("JMX monitoring is enabled. Adding Solr mbeans to JMX Server: " + log.info("JMX monitoring is enabled. Adding Solr mbeans to JMX Server: " + server); } else { try { @@ -122,7 +122,7 @@ public class JmxMonitoredMap extends .newJMXConnectorServer(new JMXServiceURL(jmxConfig.serviceUrl), null, server); connector.start(); - LOG.info("JMX monitoring is enabled at " + jmxConfig.serviceUrl); + log.info("JMX monitoring is enabled at " + jmxConfig.serviceUrl); } catch (Exception e) { // Release the reference server = null; @@ -145,7 +145,7 @@ public class JmxMonitoredMap extends ObjectName instance = new ObjectName(jmxRootName + ":*"); objectNames = server.queryNames(instance, exp); } catch (Exception e) { - LOG.warn("Exception querying for mbeans", e); + log.warn("Exception querying for mbeans", e); } if (objectNames != null) { @@ -153,7 +153,7 @@ public class JmxMonitoredMap extends try { server.unregisterMBean(name); } catch (Exception e) { - LOG.warn("Exception un-registering mbean {}", name, e); + log.warn("Exception un-registering mbean {}", name, e); } } } @@ -181,7 +181,7 @@ public class JmxMonitoredMap extends SolrDynamicMBean mbean = new SolrDynamicMBean(coreHashCode, infoBean, useCachedStatsBetweenGetMBeanInfoCalls); server.registerMBean(mbean, name); } catch (Exception e) { - LOG.warn( "Failed to register info bean: " + key, e); + log.warn( "Failed to register info bean: " + key, e); } } @@ -201,7 +201,7 @@ public class JmxMonitoredMap extends try { unregister((String) key, infoBean); } catch (RuntimeException e) { - LOG.warn( "Failed to unregister info bean: " + key, e); + log.warn( "Failed to unregister info bean: " + key, e); } } return super.remove(key); @@ -319,7 +319,7 @@ public class JmxMonitoredMap extends } catch (Exception e) { // don't log issue if the core is closing if (!(SolrException.getRootCause(e) instanceof AlreadyClosedException)) - LOG.warn("Could not getStatistics on info bean {}", infoBean.getName(), e); + log.warn("Could not getStatistics on info bean {}", infoBean.getName(), e); } MBeanAttributeInfo[] attrInfoArr = attrInfoList @@ -395,7 +395,7 @@ public class JmxMonitoredMap extends try { list.add(new Attribute(attribute, getAttribute(attribute))); } catch (Exception e) { - LOG.warn("Could not get attribute " + attribute); + log.warn("Could not get attribute " + attribute); } } diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java index 412bd9395f1..343f988a4ca 100644 --- a/solr/core/src/java/org/apache/solr/core/PluginBag.java +++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -116,10 +117,10 @@ public class PluginBag implements AutoCloseable { PluginHolder createPlugin(PluginInfo info) { if ("true".equals(String.valueOf(info.attributes.get("runtimeLib")))) { - log.info(" {} : '{}' created with runtimeLib=true ", meta.getCleanTag(), info.name); + log.debug(" {} : '{}' created with runtimeLib=true ", meta.getCleanTag(), info.name); return new LazyPluginHolder<>(meta, info, core, core.getMemClassLoader()); } else if ("lazy".equals(info.attributes.get("startup")) && meta.options.contains(SolrConfig.PluginOpts.LAZY)) { - log.info("{} : '{}' created with startup=lazy ", meta.getCleanTag(), info.name); + log.debug("{} : '{}' created with startup=lazy ", meta.getCleanTag(), info.name); return new LazyPluginHolder(meta, info, core, core.getResourceLoader()); } else { T inst = core.createInstance(info.className, (Class) meta.clazz, meta.getCleanTag(), null, core.getResourceLoader()); @@ -228,6 +229,10 @@ public class PluginBag implements AutoCloseable { PluginHolder old = put(name, o); if (old != null) log.warn("Multiple entries of {} with name {}", meta.getCleanTag(), name); } + if (infos.size() > 0) { // Aggregate logging + log.info("[{}] Initialized {} plugins of type {}: {}", solrCore.getName(), infos.size(), meta.getCleanTag(), + infos.stream().map(i -> i.name).collect(Collectors.toList())); + } for (Map.Entry e : defaults.entrySet()) { if (!contains(e.getKey())) { put(e.getKey(), new PluginHolder(null, e.getValue())); diff --git a/solr/core/src/java/org/apache/solr/core/SchemaCodecFactory.java b/solr/core/src/java/org/apache/solr/core/SchemaCodecFactory.java index c575ecbca10..2b69d513b01 100644 --- a/solr/core/src/java/org/apache/solr/core/SchemaCodecFactory.java +++ b/solr/core/src/java/org/apache/solr/core/SchemaCodecFactory.java @@ -86,10 +86,10 @@ public class SchemaCodecFactory extends CodecFactory implements SolrCoreAware { "Invalid compressionMode: '" + compressionModeStr + "'. Value must be one of " + Arrays.toString(Mode.values())); } - log.info("Using compressionMode: " + compressionMode); + log.debug("Using compressionMode: " + compressionMode); } else { compressionMode = SOLR_DEFAULT_COMPRESSION_MODE; - log.info("Using default compressionMode: " + compressionMode); + log.debug("Using default compressionMode: " + compressionMode); } codec = new Lucene62Codec(compressionMode) { @Override diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java index 0c9b96e0819..a661f6c7cb0 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java +++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java @@ -228,7 +228,7 @@ public class SolrConfig extends Config implements MapSerializable { indexConfig = new SolrIndexConfig(this, "indexConfig", null); booleanQueryMaxClauseCount = getInt("query/maxBooleanClauses", BooleanQuery.getMaxClauseCount()); - log.info("Using Lucene MatchVersion: " + luceneMatchVersion); + log.info("Using Lucene MatchVersion: {}", luceneMatchVersion); // Warn about deprecated / discontinued parameters // boolToFilterOptimizer has had no effect since 3.1 @@ -327,7 +327,7 @@ public class SolrConfig extends Config implements MapSerializable { } solrRequestParsers = new SolrRequestParsers(this); - log.info("Loaded SolrConfig: " + name); + log.info("Loaded SolrConfig: {}", name); } public static final List plugins = ImmutableList.builder() @@ -409,7 +409,7 @@ public class SolrConfig extends Config implements MapSerializable { int version = 0; // will be always 0 for file based resourceLoader if (in instanceof ZkSolrResourceLoader.ZkByteArrayInputStream) { version = ((ZkSolrResourceLoader.ZkByteArrayInputStream) in).getStat().getVersion(); - log.info("config overlay loaded . version : {} ", version); + log.debug("Config overlay loaded. version : {} ", version); } isr = new InputStreamReader(in, StandardCharsets.UTF_8); Map m = (Map) ObjectBuilder.getVal(new JSONParser(isr)); @@ -750,7 +750,7 @@ public class SolrConfig extends Config implements MapSerializable { NodeList nodes = (NodeList) evaluate("lib", XPathConstants.NODESET); if (nodes == null || nodes.getLength() == 0) return; - log.info("Adding specified lib dirs to ClassLoader"); + log.debug("Adding specified lib dirs to ClassLoader"); SolrResourceLoader loader = getResourceLoader(); List urls = new ArrayList<>(); @@ -931,7 +931,7 @@ public class SolrConfig extends Config implements MapSerializable { public RequestParams refreshRequestParams() { requestParams = RequestParams.getFreshRequestParams(getResourceLoader(), requestParams); - log.info("current version of requestparams : {}", requestParams.getZnodeVersion()); + log.debug("current version of requestparams : {}", requestParams.getZnodeVersion()); return requestParams; } diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index c837fba9956..75d394ab44c 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -439,11 +439,11 @@ public final class SolrCore implements SolrInfoMBean, Closeable { if ("firstSearcher".equals(event)) { SolrEventListener obj = createInitInstance(info, clazz, label, null); firstSearcherListeners.add(obj); - log.info("[{}] Added SolrEventListener for firstSearcher: [{}]", logid, obj); + log.debug("[{}] Added SolrEventListener for firstSearcher: [{}]", logid, obj); } else if ("newSearcher".equals(event)) { SolrEventListener obj = createInitInstance(info, clazz, label, null); newSearcherListeners.add(obj); - log.info("[{}] Added SolrEventListener for newSearcher: [{}]", logid, obj); + log.debug("[{}] Added SolrEventListener for newSearcher: [{}]", logid, obj); } } } @@ -521,13 +521,13 @@ public final class SolrCore implements SolrInfoMBean, Closeable { final PluginInfo info = solrConfig.getPluginInfo(DirectoryFactory.class.getName()); final DirectoryFactory dirFactory; if (info != null) { - log.info(info.className); + log.debug(info.className); dirFactory = getResourceLoader().newInstance(info.className, DirectoryFactory.class); // allow DirectoryFactory instances to access the CoreContainer dirFactory.initCoreContainer(getCoreDescriptor().getCoreContainer()); dirFactory.init(info.initArgs); } else { - log.info("solr.NRTCachingDirectoryFactory"); + log.debug("solr.NRTCachingDirectoryFactory"); dirFactory = new NRTCachingDirectoryFactory(); dirFactory.initCoreContainer(getCoreDescriptor().getCoreContainer()); } @@ -851,7 +851,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { log.debug("Registering JMX bean [{}] from directory factory.", bean.getName()); // Not worried about concurrency, so no reason to use putIfAbsent if (infoRegistry.containsKey(bean.getName())){ - log.info("Ignoring JMX bean [{}] due to name conflict.", bean.getName()); + log.debug("Ignoring JMX bean [{}] due to name conflict.", bean.getName()); } else { infoRegistry.put(bean.getName(), bean); } @@ -959,7 +959,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { if (config.jmxConfig.enabled) { return new JmxMonitoredMap(name, String.valueOf(this.hashCode()), config.jmxConfig); } else { - log.info("JMX monitoring not detected for core: " + name); + log.debug("JMX monitoring not detected for core: " + name); return new ConcurrentHashMap<>(); } } @@ -1074,9 +1074,9 @@ public final class SolrCore implements SolrInfoMBean, Closeable { if (pluginInfo != null && pluginInfo.className != null && pluginInfo.className.length() > 0) { cache = createInitInstance(pluginInfo, StatsCache.class, null, LocalStatsCache.class.getName()); - log.info("Using statsCache impl: " + cache.getClass().getName()); + log.debug("Using statsCache impl: " + cache.getClass().getName()); } else { - log.info("Using default statsCache cache: " + LocalStatsCache.class.getName()); + log.debug("Using default statsCache cache: " + LocalStatsCache.class.getName()); cache = new LocalStatsCache(); } return cache; @@ -1099,7 +1099,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { def = map.get(null); } if (def == null) { - log.info("no updateRequestProcessorChain defined as default, creating implicit default"); + log.debug("no updateRequestProcessorChain defined as default, creating implicit default"); // construct the default chain UpdateRequestProcessorFactory[] factories = new UpdateRequestProcessorFactory[]{ new LogUpdateProcessorFactory(), @@ -1645,7 +1645,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { // but log a message about it to minimize confusion newestSearcher.incref(); - log.info("SolrIndexSearcher has not changed - not re-opening: " + newestSearcher.get().getName()); + log.debug("SolrIndexSearcher has not changed - not re-opening: " + newestSearcher.get().getName()); return newestSearcher; } // ELSE: open a new searcher against the old reader... @@ -2633,7 +2633,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { return false; } if (stat.getVersion() > currentVersion) { - log.info(zkPath+" is stale will need an update from {} to {}", currentVersion,stat.getVersion()); + log.debug(zkPath+" is stale will need an update from {} to {}", currentVersion,stat.getVersion()); return true; } return false; @@ -2654,7 +2654,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { final String coreName = getName(); if (myDirFactory != null && myDataDir != null && myIndexDir != null) { Thread cleanupThread = new Thread(() -> { - log.info("Looking for old index directories to cleanup for core {} in {}", coreName, myDataDir); + log.debug("Looking for old index directories to cleanup for core {} in {}", coreName, myDataDir); try { myDirFactory.cleanupOldIndexDirectories(myDataDir, myIndexDir); } catch (Exception exc) { diff --git a/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java b/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java index 238d38727c0..4bf25aa99b4 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java +++ b/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java @@ -41,9 +41,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; @@ -89,6 +92,8 @@ public class SolrResourceLoader implements ResourceLoader,Closeable "update.processor.", "util.", "spelling.", "handler.component.", "handler.dataimport.", "spelling.suggest.", "spelling.suggest.fst.", "rest.schema.analysis.", "security.","handler.admin." }; + private static final java.lang.String SOLR_CORE_NAME = "solr.core.name"; + private static Set loggedOnce = new ConcurrentSkipListSet<>(); protected URLClassLoader classLoader; private final Path instanceDir; @@ -150,10 +155,10 @@ public class SolrResourceLoader implements ResourceLoader,Closeable public SolrResourceLoader(Path instanceDir, ClassLoader parent, Properties coreProperties) { if (instanceDir == null) { this.instanceDir = SolrResourceLoader.locateSolrHome().toAbsolutePath().normalize(); - log.info("new SolrResourceLoader for deduced Solr Home: '{}'", this.instanceDir); + log.debug("new SolrResourceLoader for deduced Solr Home: '{}'", this.instanceDir); } else{ this.instanceDir = instanceDir.toAbsolutePath().normalize(); - log.info("new SolrResourceLoader for directory: '{}'", this.instanceDir); + log.debug("new SolrResourceLoader for directory: '{}'", this.instanceDir); } if (parent == null) @@ -193,6 +198,12 @@ public class SolrResourceLoader implements ResourceLoader,Closeable if (newLoader != classLoader) { this.classLoader = newLoader; } + log.info("[{}] Added {} libs to classloader, from paths: {}", + getCoreProperties().getProperty(SOLR_CORE_NAME), urls.size(), urls.stream() + .map(u -> u.getPath().substring(0,u.getPath().lastIndexOf("/"))) + .sorted() + .distinct() + .collect(Collectors.toList())); } /** @@ -232,7 +243,7 @@ public class SolrResourceLoader implements ResourceLoader,Closeable allURLs.addAll(Arrays.asList(oldLoader.getURLs())); allURLs.addAll(urls); for (URL url : urls) { - log.info("Adding '{}' to classloader", url.toString()); + log.debug("Adding '{}' to classloader", url.toString()); } ClassLoader oldParent = oldLoader.getParent(); @@ -754,11 +765,11 @@ public class SolrResourceLoader implements ResourceLoader,Closeable try { Context c = new InitialContext(); home = (String)c.lookup("java:comp/env/"+project+"/home"); - log.info("Using JNDI solr.home: "+home ); + logOnceInfo("home_using_jndi", "Using JNDI solr.home: "+home ); } catch (NoInitialContextException e) { - log.info("JNDI not configured for "+project+" (NoInitialContextEx)"); + log.debug("JNDI not configured for "+project+" (NoInitialContextEx)"); } catch (NamingException e) { - log.info("No /"+project+"/home in JNDI"); + log.debug("No /"+project+"/home in JNDI"); } catch( RuntimeException ex ) { log.warn("Odd RuntimeException while testing for JNDI: " + ex.getMessage()); } @@ -768,18 +779,26 @@ public class SolrResourceLoader implements ResourceLoader,Closeable String prop = project + ".solr.home"; home = System.getProperty(prop); if( home != null ) { - log.info("using system property "+prop+": " + home ); + logOnceInfo("home_using_sysprop", "Using system property "+prop+": " + home ); } } // if all else fails, try if( home == null ) { home = project + '/'; - log.info(project + " home defaulted to '" + home + "' (could not find system property or JNDI)"); + logOnceInfo("home_default", project + " home defaulted to '" + home + "' (could not find system property or JNDI)"); } return Paths.get(home); } + // Logs a message only once per startup + private static void logOnceInfo(String key, String msg) { + if (!loggedOnce.contains(key)) { + loggedOnce.add(key); + log.info(msg); + } + } + /** * @return the instance path for this resource loader */ diff --git a/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java index 26cbe215f70..54feddc3215 100644 --- a/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java +++ b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java @@ -358,7 +358,7 @@ public class SolrSnapshotMetaDataManager { * Reads the snapshot meta-data information from the given {@link Directory}. */ private synchronized void loadFromSnapshotMetadataFile() throws IOException { - log.info("Loading from snapshot metadata file..."); + log.debug("Loading from snapshot metadata file..."); long genLoaded = -1; IOException ioe = null; List snapshotFiles = new ArrayList<>(); diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index bc5927fc336..d1e1ed5d468 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -139,7 +139,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. this.keepAliveTime = getParameter(args, MAX_THREAD_IDLE_TIME, keepAliveTime,sb); this.queueSize = getParameter(args, INIT_SIZE_OF_QUEUE, queueSize,sb); this.accessPolicy = getParameter(args, INIT_FAIRNESS_POLICY, accessPolicy,sb); - log.info("created with {}",sb); + log.debug("created with {}",sb); // magic sysprop to make tests reproducible: set by SolrTestCaseJ4. String v = System.getProperty("tests.shardhandler.randomSeed"); diff --git a/solr/core/src/java/org/apache/solr/handler/loader/XMLLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/XMLLoader.java index a75857cca48..d6a51705a31 100644 --- a/solr/core/src/java/org/apache/solr/handler/loader/XMLLoader.java +++ b/solr/core/src/java/org/apache/solr/handler/loader/XMLLoader.java @@ -111,7 +111,7 @@ public class XMLLoader extends ContentStreamLoader { xsltCacheLifetimeSeconds = XSLT_CACHE_DEFAULT; if(args != null) { xsltCacheLifetimeSeconds = args.getInt(XSLT_CACHE_PARAM,XSLT_CACHE_DEFAULT); - log.info("xsltCacheLifetimeSeconds=" + xsltCacheLifetimeSeconds); + log.debug("xsltCacheLifetimeSeconds=" + xsltCacheLifetimeSeconds); } return this; } diff --git a/solr/core/src/java/org/apache/solr/logging/LogWatcher.java b/solr/core/src/java/org/apache/solr/logging/LogWatcher.java index 3ba74217c12..c5105902826 100644 --- a/solr/core/src/java/org/apache/solr/logging/LogWatcher.java +++ b/solr/core/src/java/org/apache/solr/logging/LogWatcher.java @@ -127,7 +127,7 @@ public abstract class LogWatcher { public static LogWatcher newRegisteredLogWatcher(LogWatcherConfig config, SolrResourceLoader loader) { if (!config.isEnabled()) { - log.info("A LogWatcher is not enabled"); + log.debug("A LogWatcher is not enabled"); return null; } @@ -135,7 +135,7 @@ public abstract class LogWatcher { if (logWatcher != null) { if (config.getWatcherSize() > 0) { - log.info("Registering Log Listener [{}]", logWatcher.getName()); + log.debug("Registering Log Listener [{}]", logWatcher.getName()); logWatcher.registerListener(config.asListenerConfig()); } } @@ -150,7 +150,7 @@ public abstract class LogWatcher { try { slf4jImpl = StaticLoggerBinder.getSingleton().getLoggerFactoryClassStr(); - log.info("SLF4J impl is " + slf4jImpl); + log.debug("SLF4J impl is " + slf4jImpl); if (fname == null) { if ("org.slf4j.impl.Log4jLoggerFactory".equals(slf4jImpl)) { fname = "Log4j"; @@ -168,7 +168,7 @@ public abstract class LogWatcher { } if (fname == null) { - log.info("No LogWatcher configured"); + log.debug("No LogWatcher configured"); return null; } diff --git a/solr/core/src/java/org/apache/solr/schema/CurrencyField.java b/solr/core/src/java/org/apache/solr/schema/CurrencyField.java index f2dc3511013..787599a1ccb 100644 --- a/solr/core/src/java/org/apache/solr/schema/CurrencyField.java +++ b/solr/core/src/java/org/apache/solr/schema/CurrencyField.java @@ -787,7 +787,7 @@ class FileExchangeRateProvider implements ExchangeRateProvider { InputStream is = null; Map> tmpRates = new HashMap<>(); try { - log.info("Reloading exchange rates from file "+this.currencyConfigFile); + log.debug("Reloading exchange rates from file "+this.currencyConfigFile); is = loader.openResource(currencyConfigFile); javax.xml.parsers.DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); diff --git a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java index 2cec6d3e406..96ad09dc0a4 100644 --- a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java +++ b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java @@ -537,18 +537,20 @@ public class IndexSchema { throw new SolrException(ErrorCode.SERVER_ERROR, msg); } } - log.info("default search field in schema is "+defaultSearchFieldName); + log.info("[{}] default search field in schema is {}. WARNING: Deprecated, please use 'df' on request instead.", + loader.getCoreProperties().getProperty(SOLR_CORE_NAME), defaultSearchFieldName); } // /schema/solrQueryParser/@defaultOperator expression = stepsToPath(SCHEMA, SOLR_QUERY_PARSER, AT + DEFAULT_OPERATOR); node = (Node) xpath.evaluate(expression, document, XPathConstants.NODE); if (node==null) { - log.debug("using default query parser operator (OR)"); + log.debug("Default query parser operator not set in Schema"); } else { isExplicitQueryParserDefaultOperator = true; queryParserDefaultOperator=node.getNodeValue().trim(); - log.info("query parser default operator is "+queryParserDefaultOperator); + log.info("[{}] query parser default operator is {}. WARNING: Deprecated, please use 'q.op' on request instead.", + loader.getCoreProperties().getProperty(SOLR_CORE_NAME), queryParserDefaultOperator); } // /schema/uniqueKey/text() @@ -577,7 +579,8 @@ public class IndexSchema { } uniqueKeyFieldName=uniqueKeyField.getName(); uniqueKeyFieldType=uniqueKeyField.getType(); - log.info("unique key field: "+uniqueKeyFieldName); + log.info("[{}] unique key field: {}", + loader.getCoreProperties().getProperty(SOLR_CORE_NAME), uniqueKeyFieldName); // Unless the uniqueKeyField is marked 'required=false' then make sure it exists if( Boolean.FALSE != explicitRequiredProp.get( uniqueKeyFieldName ) ) { diff --git a/solr/core/src/java/org/apache/solr/schema/OpenExchangeRatesOrgProvider.java b/solr/core/src/java/org/apache/solr/schema/OpenExchangeRatesOrgProvider.java index 1a7bfe6af0a..2d16108d200 100644 --- a/solr/core/src/java/org/apache/solr/schema/OpenExchangeRatesOrgProvider.java +++ b/solr/core/src/java/org/apache/solr/schema/OpenExchangeRatesOrgProvider.java @@ -139,7 +139,7 @@ public class OpenExchangeRatesOrgProvider implements ExchangeRateProvider { public boolean reload() throws SolrException { InputStream ratesJsonStream = null; try { - log.info("Reloading exchange rates from "+ratesFileLocation); + log.debug("Reloading exchange rates from "+ratesFileLocation); try { ratesJsonStream = (new URL(ratesFileLocation)).openStream(); } catch (Exception e) { @@ -172,7 +172,7 @@ public class OpenExchangeRatesOrgProvider implements ExchangeRateProvider { refreshInterval = 60; log.warn("Specified refreshInterval was too small. Setting to 60 minutes which is the update rate of openexchangerates.org"); } - log.info("Initialized with rates="+ratesFileLocation+", refreshInterval="+refreshInterval+"."); + log.debug("Initialized with rates="+ratesFileLocation+", refreshInterval="+refreshInterval+"."); refreshIntervalSeconds = refreshInterval * 60; } catch (SolrException e1) { throw e1; diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index cda315424eb..f5fe4641764 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -117,6 +117,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { @Override public void init(FilterConfig config) throws ServletException { + log.trace("SolrDispatchFilter.init(): {}", this.getClass().getClassLoader()); String muteConsole = System.getProperty(SOLR_LOG_MUTECONSOLE); if (muteConsole != null && !Arrays.asList("false","0","off","no").contains(muteConsole.toLowerCase(Locale.ROOT))) { StartupLoggingUtils.muteConsole(); @@ -142,7 +143,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { this.cores = createCoreContainer(solrHome == null ? SolrResourceLoader.locateSolrHome() : Paths.get(solrHome), extraProperties); this.httpClient = cores.getUpdateShardHandler().getHttpClient(); - log.info("user.dir=" + System.getProperty("user.dir")); + log.debug("user.dir=" + System.getProperty("user.dir")); } catch( Throwable t ) { // catch this so our filter still works @@ -153,7 +154,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { } } - log.info("SolrDispatchFilter.init() done"); + log.trace("SolrDispatchFilter.init() done"); } /** diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java index 30e31cac6b1..35096e53224 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java @@ -72,7 +72,7 @@ public class UpdateShardHandler { clientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, cfg.getMaxUpdateConnections()); clientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, cfg.getMaxUpdateConnectionsPerHost()); } - log.info("Created UpdateShardHandler HTTP client with params: {}", clientParams); + log.debug("Created UpdateShardHandler HTTP client with params: {}", clientParams); } public HttpClient getHttpClient() { diff --git a/solr/server/resources/log4j.properties b/solr/server/resources/log4j.properties index 672af4cb7a3..9d8eca0816a 100644 --- a/solr/server/resources/log4j.properties +++ b/solr/server/resources/log4j.properties @@ -19,6 +19,9 @@ log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (% log4j.logger.org.apache.zookeeper=WARN log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.eclipse.jetty.server=INFO +log4j.logger.org.eclipse.jetty.server.handler=WARN # set to INFO to enable infostream log messages log4j.logger.org.apache.solr.update.LoggingInfoStream=OFF From 6365920a0e9ed3bf0b13b90955cd73535d495f9a Mon Sep 17 00:00:00 2001 From: Dennis Gove Date: Tue, 13 Sep 2016 10:50:52 -0400 Subject: [PATCH 7/7] SOLR-8487: Adds CommitStream to support sending commits to a collection being updated --- solr/CHANGES.txt | 2 + .../apache/solr/handler/StreamHandler.java | 3 +- .../client/solrj/io/stream/CommitStream.java | 260 ++++++++++++++ .../client/solrj/io/stream/UpdateStream.java | 3 +- .../solrj/io/stream/expr/StreamFactory.java | 35 ++ .../solrj/io/stream/StreamExpressionTest.java | 340 ++++++++++++++++++ 6 files changed, 641 insertions(+), 2 deletions(-) create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 94f30c787d8..5138552161b 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -90,6 +90,8 @@ New Features * SOLR-8186: Reduce logging to logs/solr--console.log when not running in foreground mode Show timestamp also in foreground log. Also removes some logging noise. (janhoy) +* SOLR-8487: Adds CommitStream to support sending commits to a collection being updated. (Dennis Gove) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index e97df34eb9d..d190e508884 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -105,6 +105,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("update", UpdateStream.class) .withFunctionName("jdbc", JDBCStream.class) .withFunctionName("topic", TopicStream.class) + .withFunctionName("commit", CommitStream.class) // decorator streams .withFunctionName("merge", MergeStream.class) @@ -128,7 +129,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("shortestPath", ShortestPathStream.class) .withFunctionName("gatherNodes", GatherNodesStream.class) .withFunctionName("select", SelectStream.class) - .withFunctionName("scoreNodes", ScoreNodesStream.class) + .withFunctionName("scoreNodes", ScoreNodesStream.class) // metrics .withFunctionName("min", MinMetric.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java new file mode 100644 index 00000000000..c075978a817 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends a commit message to a SolrCloud collection + */ +public class CommitStream extends TupleStream implements Expressible { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // Part of expression / passed in + private String collection; + private String zkHost; + private boolean waitFlush; + private boolean waitSearcher; + private boolean softCommit; + private int commitBatchSize; + private TupleStream tupleSource; + + private transient SolrClientCache clientCache; + private long docsSinceCommit; + + public CommitStream(StreamExpression expression, StreamFactory factory) throws IOException { + + String collectionName = factory.getValueOperand(expression, 0); + String zkHost = findZkHost(factory, collectionName, expression); + int batchSize = factory.getIntOperand(expression, "batchSize", 0); + boolean waitFlush = factory.getBooleanOperand(expression, "waitFlush", false); + boolean waitSearcher = factory.getBooleanOperand(expression, "waitSearcher", false); + boolean softCommit = factory.getBooleanOperand(expression, "softCommit", false); + + if(null == collectionName){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression)); + } + if(null == zkHost){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName)); + } + if(batchSize < 0){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize cannot be less than 0 but is '%d'",expression,batchSize)); + } + + //Extract underlying TupleStream. + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + if (1 != streamExpressions.size()) { + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); + } + StreamExpression sourceStreamExpression = streamExpressions.get(0); + + init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, batchSize, waitFlush, waitSearcher, softCommit); + } + + public CommitStream(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) throws IOException { + if (batchSize < 0) { + throw new IOException(String.format(Locale.ROOT,"batchSize '%d' cannot be less than 0.", batchSize)); + } + init(collectionName, tupleSource, zkHost, batchSize, waitFlush, waitSearcher, softCommit); + } + + private void init(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) { + this.collection = collectionName; + this.zkHost = zkHost; + this.commitBatchSize = batchSize; + this.waitFlush = waitFlush; + this.waitSearcher = waitSearcher; + this.softCommit = softCommit; + this.tupleSource = tupleSource; + } + + @Override + public void open() throws IOException { + tupleSource.open(); + clientCache = new SolrClientCache(); + docsSinceCommit = 0; + } + + @Override + public Tuple read() throws IOException { + + Tuple tuple = tupleSource.read(); + if(tuple.EOF){ + if(docsSinceCommit > 0){ + sendCommit(); + } + } + else{ + // if the read document contains field 'batchIndexed' then it's a summary + // document and we can update our count based on it's value. If not then + // just increment by 1 + if(tuple.fields.containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){ + docsSinceCommit += Integer.parseInt(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME)); + } + else{ + docsSinceCommit += 1; + } + + if(commitBatchSize > 0 && docsSinceCommit >= commitBatchSize){ + // if commitBatchSize == 0 then the tuple.EOF above will end up calling sendCommit() + sendCommit(); + } + } + + return tuple; + } + + private boolean isInteger(String string){ + try{ + Integer.parseInt(string); + return true; + } + catch(NumberFormatException e){ + return false; + } + } + + @Override + public void close() throws IOException { + clientCache.close(); + tupleSource.close(); + } + + @Override + public StreamComparator getStreamSort() { + return tupleSource.getStreamSort(); + } + + @Override + public List children() { + ArrayList sourceList = new ArrayList(1); + sourceList.add(tupleSource); + return sourceList; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + expression.addParameter(collection); + expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost)); + expression.addParameter(new StreamExpressionNamedParameter("batchSize", Integer.toString(commitBatchSize))); + expression.addParameter(new StreamExpressionNamedParameter("waitFlush", Boolean.toString(waitFlush))); + expression.addParameter(new StreamExpressionNamedParameter("waitSearcher", Boolean.toString(waitSearcher))); + expression.addParameter(new StreamExpressionNamedParameter("softCommit", Boolean.toString(softCommit))); + + if(includeStreams){ + if(tupleSource instanceof Expressible){ + expression.addParameter(((Expressible)tupleSource).toExpression(factory)); + } else { + throw new IOException("This CommitStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + } + else{ + expression.addParameter(""); + } + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + // A commit stream is backward wrt the order in the explanation. This stream is the "child" + // while the collection we're committing to is the parent. + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore"); + + explanation.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + explanation.setImplementingClass("Solr/Lucene"); + explanation.setExpressionType(ExpressionType.DATASTORE); + explanation.setExpression("Commit into " + collection); + + // child is a stream so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId().toString()); + child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass()))); + child.setImplementingClass(getClass().getName()); + child.setExpressionType(ExpressionType.STREAM_DECORATOR); + child.setExpression(toExpression(factory, false).toString()); + child.addChild(tupleSource.toExplanation(factory)); + + explanation.addChild(child); + + return explanation; + } + + @Override + public void setStreamContext(StreamContext context) { + if(null != context.getSolrClientCache()){ + this.clientCache = context.getSolrClientCache(); + // this overrides the one created in open + } + + this.tupleSource.setStreamContext(context); + } + + private String findZkHost(StreamFactory factory, String collectionName, StreamExpression expression) { + StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); + if(null == zkHostExpression){ + String zkHost = factory.getCollectionZkHost(collectionName); + if(zkHost == null) { + return factory.getDefaultZkHost(); + } else { + return zkHost; + } + } else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){ + return ((StreamExpressionValue)zkHostExpression.getParameter()).getValue(); + } + + return null; + } + + private void sendCommit() throws IOException { + + try { + clientCache.getCloudSolrClient(zkHost).commit(collection, waitFlush, waitSearcher, softCommit); + } catch (SolrServerException | IOException e) { + LOG.warn(String.format(Locale.ROOT, "Unable to commit documents to collection '%s' due to unexpected error.", collection), e); + String className = e.getClass().getName(); + String message = e.getMessage(); + throw new IOException(String.format(Locale.ROOT,"Unexpected error when committing documents to collection %s- %s:%s", collection, className, message)); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java index 5b1aae76b90..55291bf9ed9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java @@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory; public class UpdateStream extends TupleStream implements Expressible { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static String BATCH_INDEXED_FIELD_NAME = "batchIndexed"; // field name in summary tuple for #docs updated in batch private String collection; private String zkHost; private int updateBatchSize; @@ -307,7 +308,7 @@ public class UpdateStream extends TupleStream implements Expressible { Map m = new HashMap(); this.totalDocsIndex += batchSize; ++batchNumber; - m.put("batchIndexed", batchSize); + m.put(BATCH_INDEXED_FIELD_NAME, batchSize); m.put("totalIndexed", this.totalDocsIndex); m.put("batchNumber", batchNumber); if(coreName != null) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java index 7f574fd645e..9008bc675e8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java @@ -174,6 +174,41 @@ public class StreamFactory implements Serializable { return matchingStreamExpressions; } + public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException{ + StreamExpressionNamedParameter param = getNamedOperand(expression, paramName); + + if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){ + if(null != defaultValue){ + return defaultValue; + } + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type integer but didn't find one",expression, paramName)); + } + String nStr = ((StreamExpressionValue)param.getParameter()).getValue(); + try{ + return Integer.parseInt(nStr); + } + catch(NumberFormatException e){ + if(null != defaultValue){ + return defaultValue; + } + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.",expression, paramName, nStr)); + } + } + + public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException{ + StreamExpressionNamedParameter param = getNamedOperand(expression, paramName); + + if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){ + if(null != defaultValue){ + return defaultValue; + } + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one",expression, paramName)); + } + String nStr = ((StreamExpressionValue)param.getParameter()).getValue(); + return Boolean.parseBoolean(nStr); + } + + public TupleStream constructStream(String expressionClause) throws IOException { return constructStream(StreamExpressionParser.parse(expressionClause)); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 7c3a3a6e0c0..ad7b9f44763 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -3034,6 +3034,346 @@ public class StreamExpressionTest extends SolrCloudTestCase { CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); } + //////////////////////////////////////////// + @Test + public void testCommitStream() throws Exception { + + CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("destinationCollection", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("destinationCollection", cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("commit", CommitStream.class); + + //Copy all docs to destinationCollection + expression = StreamExpressionParser.parse("commit(destinationCollection, batchSize=2, update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\")))"); + stream = factory.constructStream(expression); + List tuples = getTuples(stream); + + //Ensure that all CommitStream tuples indicate the correct number of copied/indexed docs + assert(tuples.size() == 1); + t = tuples.get(0); + assert(t.EOF == false); + assertEquals(5, t.get("batchIndexed")); + + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(destinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello0")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello1")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello2")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello3")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello4")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("destinationCollection").process(cluster.getSolrClient()); + } + + @Test + public void testParallelCommitStream() throws Exception { + + CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + String zkHost = cluster.getZkServer().getZkAddress(); + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("parallelDestinationCollection", cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("commit", CommitStream.class) + .withFunctionName("parallel", ParallelStream.class); + + //Copy all docs to destinationCollection + String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")))"; + TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")"); + List tuples = getTuples(parallelUpdateStream); + + //Ensure that all UpdateStream tuples indicate the correct number of copied/indexed docs + long count = 0; + + for(Tuple tuple : tuples) { + count+=tuple.getLong("batchIndexed"); + } + + assert(count == 5); + + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(parallelDestinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello0")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello1")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello2")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello3")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello4")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("parallelDestinationCollection").process(cluster.getSolrClient()); + } + + @Test + public void testParallelDaemonCommitStream() throws Exception { + + CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + String zkHost = cluster.getZkServer().getZkAddress(); + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("commit", CommitStream.class) + .withFunctionName("parallel", ParallelStream.class) + .withFunctionName("daemon", DaemonStream.class); + + //Copy all docs to destinationCollection + String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))), runInterval=\"1000\", id=\"test\")"; + TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")"); + List tuples = getTuples(parallelUpdateStream); + assert(tuples.size() == 2); + + //Lets sleep long enough for daemon updates to run. + //Lets stop the daemons + ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list")); + + int workersComplete = 0; + for(JettySolrRunner jetty : cluster.getJettySolrRunners()) { + int iterations = 0; + INNER: + while(iterations == 0) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + if (tupleResponse.EOF) { + solrStream.close(); + break INNER; + } else { + long l = tupleResponse.getLong("iterations"); + if(l > 0) { + ++workersComplete; + } else { + try { + Thread.sleep(1000); + } catch(Exception e) { + } + } + iterations = (int) l; + solrStream.close(); + } + } + } + + assertEquals(cluster.getJettySolrRunners().size(), workersComplete); + + //Lets stop the daemons + sParams = new ModifiableSolrParams(); + sParams.set(CommonParams.QT, "/stream"); + sParams.set("action", "stop"); + sParams.set("id", "test"); + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + solrStream.close(); + } + + sParams = new ModifiableSolrParams(); + sParams.set(CommonParams.QT, "/stream"); + sParams.set("action", "list"); + + workersComplete = 0; + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + long stopTime = 0; + INNER: + while(stopTime == 0) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + if (tupleResponse.EOF) { + solrStream.close(); + break INNER; + } else { + stopTime = tupleResponse.getLong("stopTime"); + if (stopTime > 0) { + ++workersComplete; + } else { + try { + Thread.sleep(1000); + } catch(Exception e) { + + } + } + solrStream.close(); + } + } + } + + assertEquals(cluster.getJettySolrRunners().size(), workersComplete); + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello0")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello1")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello2")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello3")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello4")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); + } + //////////////////////////////////////////// + @Test public void testIntersectStream() throws Exception {