mirror of https://github.com/apache/lucene.git
SOLR-8575: Revert while investigated. (reverted from commit ec4c72310f
)
This commit is contained in:
parent
84637e99b1
commit
f6098148ae
|
@ -508,8 +508,6 @@ Bug Fixes
|
||||||
* SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
|
* SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
|
||||||
(Jan Høydahl, Steve Rowe)
|
(Jan Høydahl, Steve Rowe)
|
||||||
|
|
||||||
* SOLR-8575: Fix HDFSLogReader replay status numbers and a performance bug where we can reopen
|
|
||||||
FSDataInputStream too often. (Mark Miller, Patrick Dvorack)
|
|
||||||
|
|
||||||
* SOLR-8651: The commitWithin parameter is not passed on for deleteById in UpdateRequest in
|
* SOLR-8651: The commitWithin parameter is not passed on for deleteById in UpdateRequest in
|
||||||
distributed queries (Jessica Cheng Mallet via Erick Erickson)
|
distributed queries (Jessica Cheng Mallet via Erick Erickson)
|
||||||
|
|
|
@ -390,18 +390,16 @@ public class HdfsTransactionLog extends TransactionLog {
|
||||||
|
|
||||||
// we actually need a new reader to
|
// we actually need a new reader to
|
||||||
// see if any data was added by the writer
|
// see if any data was added by the writer
|
||||||
if (pos >= sz) {
|
if (fis.position() >= sz) {
|
||||||
log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz);
|
|
||||||
|
|
||||||
synchronized (HdfsTransactionLog.this) {
|
|
||||||
sz = fos.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
fis.close();
|
fis.close();
|
||||||
tlogOutStream.hflush();
|
tlogOutStream.hflush();
|
||||||
|
try {
|
||||||
FSDataInputStream fdis = fs.open(tlogFile);
|
FSDataInputStream fdis = fs.open(tlogFile);
|
||||||
fis = new FSDataFastInputStream(fdis, pos);
|
fis = new FSDataFastInputStream(fdis, pos);
|
||||||
|
sz = fs.getFileStatus(tlogFile).getLen();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pos == 0) {
|
if (pos == 0) {
|
||||||
|
@ -448,7 +446,7 @@ public class HdfsTransactionLog extends TransactionLog {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long currentSize() {
|
public long currentSize() {
|
||||||
return fos.size();
|
return sz;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -606,3 +604,5 @@ class FSDataFastInputStream extends FastInputStream {
|
||||||
return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
|
return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1324,7 +1324,7 @@ public class UpdateLog implements PluginInfoInitialized {
|
||||||
loglog.info(
|
loglog.info(
|
||||||
"log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
|
"log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
|
||||||
translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
|
translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
|
||||||
Math.floor(cpos / (double) csize * 100.));
|
Math.round(cpos / (double) csize * 100.));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,136 +0,0 @@
|
||||||
package org.apache.solr.cloud;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
|
||||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
@Slow
|
|
||||||
@Nightly
|
|
||||||
@SuppressSSL
|
|
||||||
public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTestBase {
|
|
||||||
|
|
||||||
private List<StoppableIndexingThread> threads;
|
|
||||||
|
|
||||||
public TlogReplayBufferedWhileIndexingTest() throws Exception {
|
|
||||||
super();
|
|
||||||
sliceCount = 1;
|
|
||||||
fixShardCount(2);
|
|
||||||
schemaString = "schema15.xml"; // we need a string id
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void beforeRestartWhileUpdatingTest() throws Exception {
|
|
||||||
System.setProperty("leaderVoteWait", "300000");
|
|
||||||
System.setProperty("solr.autoCommit.maxTime", "10000");
|
|
||||||
System.setProperty("solr.autoSoftCommit.maxTime", "3000");
|
|
||||||
if (System.getProperty("solr.hdfs.home") != null) useFactory("solr.StandardDirectoryFactory");
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void afterRestartWhileUpdatingTest() {
|
|
||||||
System.clearProperty("leaderVoteWait");
|
|
||||||
System.clearProperty("solr.autoCommit.maxTime");
|
|
||||||
System.clearProperty("solr.autoSoftCommit.maxTime");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test() throws Exception {
|
|
||||||
handle.clear();
|
|
||||||
handle.put("timestamp", SKIPVAL);
|
|
||||||
|
|
||||||
waitForRecoveriesToFinish(false);
|
|
||||||
|
|
||||||
int numThreads = 1;
|
|
||||||
|
|
||||||
threads = new ArrayList<>(numThreads);
|
|
||||||
|
|
||||||
ArrayList<JettySolrRunner> allJetty = new ArrayList<>();
|
|
||||||
allJetty.addAll(jettys);
|
|
||||||
allJetty.remove(shardToLeaderJetty.get("shard1").jetty);
|
|
||||||
assert allJetty.size() == 1 : allJetty.size();
|
|
||||||
ChaosMonkey.stop(allJetty.get(0));
|
|
||||||
|
|
||||||
StoppableIndexingThread indexThread;
|
|
||||||
for (int i = 0; i < numThreads; i++) {
|
|
||||||
indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), false, 50000, 1, false);
|
|
||||||
threads.add(indexThread);
|
|
||||||
indexThread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(2000);
|
|
||||||
|
|
||||||
ChaosMonkey.start(allJetty.get(0));
|
|
||||||
|
|
||||||
Thread.sleep(45000);
|
|
||||||
|
|
||||||
waitForThingsToLevelOut(320);
|
|
||||||
|
|
||||||
Thread.sleep(2000);
|
|
||||||
|
|
||||||
waitForRecoveriesToFinish(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), false, true);
|
|
||||||
|
|
||||||
for (StoppableIndexingThread thread : threads) {
|
|
||||||
thread.safeStop();
|
|
||||||
thread.safeStop();
|
|
||||||
}
|
|
||||||
|
|
||||||
waitForThingsToLevelOut(30);
|
|
||||||
|
|
||||||
checkShardConsistency(false, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void indexDoc(SolrInputDocument doc) throws IOException,
|
|
||||||
SolrServerException {
|
|
||||||
cloudClient.add(doc);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void distribTearDown() throws Exception {
|
|
||||||
// make sure threads have been stopped...
|
|
||||||
if (threads != null) {
|
|
||||||
for (StoppableIndexingThread thread : threads) {
|
|
||||||
thread.safeStop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
super.distribTearDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
// skip the randoms - they can deadlock...
|
|
||||||
@Override
|
|
||||||
protected void indexr(Object... fields) throws Exception {
|
|
||||||
SolrInputDocument doc = new SolrInputDocument();
|
|
||||||
addFields(doc, fields);
|
|
||||||
addFields(doc, "rnd_b", true);
|
|
||||||
indexDoc(doc);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,63 +0,0 @@
|
||||||
package org.apache.solr.cloud.hdfs;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
|
||||||
import org.apache.solr.cloud.TlogReplayBufferedWhileIndexingTest;
|
|
||||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.annotations.Nightly;
|
|
||||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
|
||||||
|
|
||||||
@Slow
|
|
||||||
@Nightly
|
|
||||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
|
||||||
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
|
||||||
})
|
|
||||||
public class HdfsTlogReplayBufferedWhileIndexingTest extends TlogReplayBufferedWhileIndexingTest {
|
|
||||||
|
|
||||||
public HdfsTlogReplayBufferedWhileIndexingTest() throws Exception {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static MiniDFSCluster dfsCluster;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupClass() throws Exception {
|
|
||||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
|
||||||
System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void teardownClass() throws Exception {
|
|
||||||
HdfsTestUtil.teardownClass(dfsCluster);
|
|
||||||
dfsCluster = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getDataDir(String dataDir) throws IOException {
|
|
||||||
return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue