diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index c396d393873..348cf017373 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -445,6 +445,8 @@ Bug Fixes * SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields. (Jan Høydahl, Steve Rowe) +* SOLR-8575: Fix HDFSLogReader replay status numbers and a performance bug where we can reopen + FSDataInputStream too often. (Mark Miller, Patrick Dvorack) Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java index 3db65c61887..bff3486b213 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java @@ -390,16 +390,18 @@ public class HdfsTransactionLog extends TransactionLog { // we actually need a new reader to // see if any data was added by the writer - if (fis.position() >= sz) { + if (pos >= sz) { + log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz); + + synchronized (HdfsTransactionLog.this) { + sz = fos.size(); + } + fis.close(); tlogOutStream.hflush(); - try { - FSDataInputStream fdis = fs.open(tlogFile); - fis = new FSDataFastInputStream(fdis, pos); - sz = fs.getFileStatus(tlogFile).getLen(); - } catch (IOException e) { - throw new RuntimeException(e); - } + + FSDataInputStream fdis = fs.open(tlogFile); + fis = new FSDataFastInputStream(fdis, pos); } if (pos == 0) { @@ -446,7 +448,7 @@ public class HdfsTransactionLog extends TransactionLog { @Override public long currentSize() { - return sz; + return fos.size(); } } @@ -604,5 +606,3 @@ class FSDataFastInputStream extends FastInputStream { return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ; } } - - diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 78c30b95d29..214d9a50d0e 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -1324,7 +1324,7 @@ public class UpdateLog implements PluginInfoInitialized { loglog.info( "log replay status {} active={} starting pos={} current pos={} current size={} % read={}", translog, activeLog, recoveryInfo.positionOfStart, cpos, csize, - Math.round(cpos / (double) csize * 100.)); + Math.floor(cpos / (double) csize * 100.)); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java b/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java new file mode 100644 index 00000000000..5c03a6093d9 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java @@ -0,0 +1,136 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.lucene.util.LuceneTestCase.Nightly; +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.common.SolrInputDocument; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +@Slow +@Nightly +@SuppressSSL +public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTestBase { + + private List threads; + + public TlogReplayBufferedWhileIndexingTest() throws Exception { + super(); + sliceCount = 1; + fixShardCount(2); + schemaString = "schema15.xml"; // we need a string id + } + + @BeforeClass + public static void beforeRestartWhileUpdatingTest() throws Exception { + System.setProperty("leaderVoteWait", "300000"); + System.setProperty("solr.autoCommit.maxTime", "10000"); + System.setProperty("solr.autoSoftCommit.maxTime", "3000"); + if (System.getProperty("solr.hdfs.home") != null) useFactory("solr.StandardDirectoryFactory"); + } + + @AfterClass + public static void afterRestartWhileUpdatingTest() { + System.clearProperty("leaderVoteWait"); + System.clearProperty("solr.autoCommit.maxTime"); + System.clearProperty("solr.autoSoftCommit.maxTime"); + } + + @Test + public void test() throws Exception { + handle.clear(); + handle.put("timestamp", SKIPVAL); + + waitForRecoveriesToFinish(false); + + int numThreads = 1; + + threads = new ArrayList<>(numThreads); + + ArrayList allJetty = new ArrayList<>(); + allJetty.addAll(jettys); + allJetty.remove(shardToLeaderJetty.get("shard1").jetty); + assert allJetty.size() == 1 : allJetty.size(); + ChaosMonkey.stop(allJetty.get(0)); + + StoppableIndexingThread indexThread; + for (int i = 0; i < numThreads; i++) { + indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), false, 50000, 1, false); + threads.add(indexThread); + indexThread.start(); + } + + Thread.sleep(2000); + + ChaosMonkey.start(allJetty.get(0)); + + Thread.sleep(45000); + + waitForThingsToLevelOut(320); + + Thread.sleep(2000); + + waitForRecoveriesToFinish(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), false, true); + + for (StoppableIndexingThread thread : threads) { + thread.safeStop(); + thread.safeStop(); + } + + waitForThingsToLevelOut(30); + + checkShardConsistency(false, false); + } + + @Override + protected void indexDoc(SolrInputDocument doc) throws IOException, + SolrServerException { + cloudClient.add(doc); + } + + + @Override + public void distribTearDown() throws Exception { + // make sure threads have been stopped... + if (threads != null) { + for (StoppableIndexingThread thread : threads) { + thread.safeStop(); + } + } + + super.distribTearDown(); + } + + // skip the randoms - they can deadlock... + @Override + protected void indexr(Object... fields) throws Exception { + SolrInputDocument doc = new SolrInputDocument(); + addFields(doc, fields); + addFields(doc, "rnd_b", true); + indexDoc(doc); + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java new file mode 100644 index 00000000000..534bb903559 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java @@ -0,0 +1,63 @@ +package org.apache.solr.cloud.hdfs; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.cloud.TlogReplayBufferedWhileIndexingTest; +import org.apache.solr.util.BadHdfsThreadsFilter; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import com.carrotsearch.randomizedtesting.annotations.Nightly; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +@Slow +@Nightly +@ThreadLeakFilters(defaultFilters = true, filters = { + BadHdfsThreadsFilter.class // hdfs currently leaks thread(s) +}) +public class HdfsTlogReplayBufferedWhileIndexingTest extends TlogReplayBufferedWhileIndexingTest { + + public HdfsTlogReplayBufferedWhileIndexingTest() throws Exception { + super(); + } + + private static MiniDFSCluster dfsCluster; + + @BeforeClass + public static void setupClass() throws Exception { + dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath()); + System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048"); + } + + @AfterClass + public static void teardownClass() throws Exception { + HdfsTestUtil.teardownClass(dfsCluster); + dfsCluster = null; + } + + + @Override + protected String getDataDir(String dataDir) throws IOException { + return HdfsTestUtil.getDataDir(dfsCluster, dataDir); + } + +}