diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index d7e85447eee..e00bf6bf555 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -522,6 +522,9 @@ Bug Fixes * SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields. (Jan Høydahl, Steve Rowe) +* SOLR-8575: Fix HDFSLogReader replay status numbers, a performance bug where we can reopen + FSDataInputStream much too often, and an hdfs tlog data integrity bug. + (Mark Miller, Patrick Dvorack, yonik) * SOLR-8651: The commitWithin parameter is not passed on for deleteById in UpdateRequest in distributed queries (Jessica Cheng Mallet via Erick Erickson) diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java index 3db65c61887..7ccbb95cc0f 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java @@ -360,8 +360,15 @@ public class HdfsTransactionLog extends TransactionLog { super(); incref(); try { + + synchronized (HdfsTransactionLog.this) { + fos.flushBuffer(); + sz = fos.size(); + } + + tlogOutStream.hflush(); + FSDataInputStream fdis = fs.open(tlogFile); - sz = fs.getFileStatus(tlogFile).getLen(); fis = new FSDataFastInputStream(fdis, startingPos); } catch (IOException e) { throw new RuntimeException(e); @@ -384,22 +391,23 @@ public class HdfsTransactionLog extends TransactionLog { if (pos >= fos.size()) { return null; } - - fos.flushBuffer(); } // we actually need a new reader to // see if any data was added by the writer - if (fis.position() >= sz) { - fis.close(); - tlogOutStream.hflush(); - try { - FSDataInputStream fdis = fs.open(tlogFile); - fis = new FSDataFastInputStream(fdis, pos); - sz = fs.getFileStatus(tlogFile).getLen(); - } catch (IOException e) { - throw new RuntimeException(e); + if (pos >= sz) { + log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz); + + synchronized (HdfsTransactionLog.this) { + fos.flushBuffer(); + sz = fos.size(); } + + tlogOutStream.hflush(); + fis.close(); + + FSDataInputStream fdis = fs.open(tlogFile); + fis = new FSDataFastInputStream(fdis, pos); } if (pos == 0) { @@ -446,7 +454,7 @@ public class HdfsTransactionLog extends TransactionLog { @Override public long currentSize() { - return sz; + return fos.size(); } } @@ -604,5 +612,3 @@ class FSDataFastInputStream extends FastInputStream { return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ; } } - - diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 78c30b95d29..4b6f7c290fb 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -61,6 +61,7 @@ import org.apache.solr.update.processor.UpdateRequestProcessorChain; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.RTimer; import org.apache.solr.util.RefCounted; +import org.apache.solr.util.TestInjection; import org.apache.solr.util.plugin.PluginInfoInitialized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1324,7 +1325,7 @@ public class UpdateLog implements PluginInfoInitialized { loglog.info( "log replay status {} active={} starting pos={} current pos={} current size={} % read={}", translog, activeLog, recoveryInfo.positionOfStart, cpos, csize, - Math.round(cpos / (double) csize * 100.)); + Math.floor(cpos / (double) csize * 100.)); } } @@ -1439,6 +1440,7 @@ public class UpdateLog implements PluginInfoInitialized { loglog.warn("REPLAY_ERR: Exception replaying log", ex); // something wrong with the request? } + TestInjection.injectUpdateLogReplayRandomPause(); } CommitUpdateCommand cmd = new CommitUpdateCommand(req, false); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 6c3b0945b45..d0e72db79bf 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -318,6 +318,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // if we are in zk mode... if (zkEnabled) { + assert TestInjection.injectUpdateRandomPause(); + if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. forwardToLeader = false; diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java index 557024d35b1..5897771189e 100644 --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java @@ -16,6 +16,7 @@ */ package org.apache.solr.util; +import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.HashSet; @@ -31,10 +32,14 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.util.Pair; import org.apache.solr.core.CoreContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Allows random faults to be injected in running code during test runs. + * + * Set static strings to "true" or "false" or "true:60" for true 60% of the time. */ public class TestInjection { @@ -46,6 +51,8 @@ public class TestInjection { } + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Pattern ENABLED_PERCENT = Pattern.compile("(true|false)(?:\\:(\\d+))?$", Pattern.CASE_INSENSITIVE); private static final Random RANDOM; @@ -67,16 +74,20 @@ public class TestInjection { public static String failUpdateRequests = null; public static String nonExistentCoreExceptionAfterUnload = null; + + public static String updateLogReplayRandomPause = null; + + public static String updateRandomPause = null; private static Set timers = Collections.synchronizedSet(new HashSet()); - - public static void reset() { nonGracefullClose = null; failReplicaRequests = null; failUpdateRequests = null; nonExistentCoreExceptionAfterUnload = null; + updateLogReplayRandomPause = null; + updateRandomPause = null; for (Timer timer : timers) { timer.cancel(); @@ -160,6 +171,44 @@ public class TestInjection { return true; } + public static boolean injectUpdateLogReplayRandomPause() { + if (updateLogReplayRandomPause != null) { + Pair pair = parseValue(updateLogReplayRandomPause); + boolean enabled = pair.getKey(); + int chanceIn100 = pair.getValue(); + if (enabled && RANDOM.nextInt(100) >= (100 - chanceIn100)) { + long rndTime = RANDOM.nextInt(1000); + log.info("inject random log replay delay of {}ms", rndTime); + try { + Thread.sleep(rndTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + return true; + } + + public static boolean injectUpdateRandomPause() { + if (updateRandomPause != null) { + Pair pair = parseValue(updateRandomPause); + boolean enabled = pair.getKey(); + int chanceIn100 = pair.getValue(); + if (enabled && RANDOM.nextInt(100) >= (100 - chanceIn100)) { + long rndTime = RANDOM.nextInt(1000); + log.info("inject random update delay of {}ms", rndTime); + try { + Thread.sleep(rndTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + return true; + } + private static Pair parseValue(String raw) { Matcher m = ENABLED_PERCENT.matcher(raw); if (!m.matches()) throw new RuntimeException("No match, probably bad syntax: " + raw); diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 9584838f10d..8cc80d9c843 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -19,6 +19,7 @@ package org.apache.solr.cloud; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker; import org.apache.solr.SolrTestCaseJ4.SuppressSSL; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; @@ -46,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; @Slow @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") @ThreadLeakLingering(linger = 60000) +@SuppressObjectReleaseTracker(bugUrl="Testing purposes") public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase { private static final int FAIL_TOLERANCE = 60; diff --git a/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java b/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java new file mode 100644 index 00000000000..2fd76208297 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.lucene.util.LuceneTestCase.Nightly; +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.util.TestInjection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +@Slow +@Nightly +@SuppressSSL +@SuppressObjectReleaseTracker(bugUrl="Testing purposes") +public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTestBase { + + private List threads; + + public TlogReplayBufferedWhileIndexingTest() throws Exception { + super(); + sliceCount = 1; + fixShardCount(2); + schemaString = "schema15.xml"; // we need a string id + } + + @BeforeClass + public static void beforeRestartWhileUpdatingTest() throws Exception { + System.setProperty("leaderVoteWait", "300000"); + System.setProperty("solr.autoCommit.maxTime", "10000"); + System.setProperty("solr.autoSoftCommit.maxTime", "3000"); + TestInjection.updateLogReplayRandomPause = "true:10"; + TestInjection.updateRandomPause = "true:10"; + if (System.getProperty("solr.hdfs.home") != null) useFactory("solr.StandardDirectoryFactory"); + } + + @AfterClass + public static void afterRestartWhileUpdatingTest() { + System.clearProperty("leaderVoteWait"); + System.clearProperty("solr.autoCommit.maxTime"); + System.clearProperty("solr.autoSoftCommit.maxTime"); + } + + @Test + public void test() throws Exception { + handle.clear(); + handle.put("timestamp", SKIPVAL); + + waitForRecoveriesToFinish(false); + + int numThreads = 3; + + threads = new ArrayList<>(numThreads); + + ArrayList allJetty = new ArrayList<>(); + allJetty.addAll(jettys); + allJetty.remove(shardToLeaderJetty.get("shard1").jetty); + assert allJetty.size() == 1 : allJetty.size(); + ChaosMonkey.stop(allJetty.get(0)); + + StoppableIndexingThread indexThread; + for (int i = 0; i < numThreads; i++) { + boolean pauseBetweenUpdates = random().nextBoolean(); + int batchSize = random().nextInt(4) + 1; + indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, 900, batchSize, pauseBetweenUpdates); + threads.add(indexThread); + indexThread.start(); + } + + Thread.sleep(2000); + + ChaosMonkey.start(allJetty.get(0)); + + Thread.sleep(45000); + + waitForThingsToLevelOut(440); + + Thread.sleep(2000); + + waitForRecoveriesToFinish(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), false, true); + + for (StoppableIndexingThread thread : threads) { + thread.safeStop(); + } + + waitForThingsToLevelOut(30); + + checkShardConsistency(false, false); + + } + + @Override + protected void indexDoc(SolrInputDocument doc) throws IOException, + SolrServerException { + cloudClient.add(doc); + } + + + @Override + public void distribTearDown() throws Exception { + // make sure threads have been stopped... + if (threads != null) { + for (StoppableIndexingThread thread : threads) { + thread.safeStop(); + } + + for (StoppableIndexingThread thread : threads) { + thread.join(); + } + } + + super.distribTearDown(); + } + + // skip the randoms - they can deadlock... + @Override + protected void indexr(Object... fields) throws Exception { + SolrInputDocument doc = new SolrInputDocument(); + addFields(doc, fields); + addFields(doc, "rnd_b", true); + indexDoc(doc); + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeyNothingIsSafeTest.java index 8399d67e2e7..47a9b98ff40 100644 --- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeyNothingIsSafeTest.java @@ -22,6 +22,7 @@ import com.carrotsearch.randomizedtesting.annotations.Nightly; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker; import org.apache.solr.cloud.ChaosMonkeyNothingIsSafeTest; import org.apache.solr.util.BadHdfsThreadsFilter; import org.junit.AfterClass; @@ -32,6 +33,7 @@ import org.junit.BeforeClass; @ThreadLeakFilters(defaultFilters = true, filters = { BadHdfsThreadsFilter.class // hdfs currently leaks thread(s) }) +@SuppressObjectReleaseTracker(bugUrl="Testing purposes") public class HdfsChaosMonkeyNothingIsSafeTest extends ChaosMonkeyNothingIsSafeTest { private static MiniDFSCluster dfsCluster; diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java new file mode 100644 index 00000000000..4ca59fe1e67 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.hdfs; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.cloud.TlogReplayBufferedWhileIndexingTest; +import org.apache.solr.util.BadHdfsThreadsFilter; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import com.carrotsearch.randomizedtesting.annotations.Nightly; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +@Slow +@Nightly +@ThreadLeakFilters(defaultFilters = true, filters = { + BadHdfsThreadsFilter.class // hdfs currently leaks thread(s) +}) +public class HdfsTlogReplayBufferedWhileIndexingTest extends TlogReplayBufferedWhileIndexingTest { + + public HdfsTlogReplayBufferedWhileIndexingTest() throws Exception { + super(); + } + + private static MiniDFSCluster dfsCluster; + + @BeforeClass + public static void setupClass() throws Exception { + dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath()); + System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048"); + } + + @AfterClass + public static void teardownClass() throws Exception { + HdfsTestUtil.teardownClass(dfsCluster); + dfsCluster = null; + } + + + @Override + protected String getDataDir(String dataDir) throws IOException { + return HdfsTestUtil.getDataDir(dfsCluster, dataDir); + } + +}