mirror of https://github.com/apache/lucene.git
SOLR-8575: Fix HDFSLogReader replay status numbers, a performance bug where we can reopen FSDataInputStream much too often, and an hdfs tlog data integrity bug.
This commit is contained in:
parent
677779086c
commit
4cc844897e
|
@ -522,6 +522,9 @@ Bug Fixes
|
|||
* SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
|
||||
(Jan Høydahl, Steve Rowe)
|
||||
|
||||
* SOLR-8575: Fix HDFSLogReader replay status numbers, a performance bug where we can reopen
|
||||
FSDataInputStream much too often, and an hdfs tlog data integrity bug.
|
||||
(Mark Miller, Patrick Dvorack, yonik)
|
||||
|
||||
* SOLR-8651: The commitWithin parameter is not passed on for deleteById in UpdateRequest in
|
||||
distributed queries (Jessica Cheng Mallet via Erick Erickson)
|
||||
|
|
|
@ -360,8 +360,15 @@ public class HdfsTransactionLog extends TransactionLog {
|
|||
super();
|
||||
incref();
|
||||
try {
|
||||
|
||||
synchronized (HdfsTransactionLog.this) {
|
||||
fos.flushBuffer();
|
||||
sz = fos.size();
|
||||
}
|
||||
|
||||
tlogOutStream.hflush();
|
||||
|
||||
FSDataInputStream fdis = fs.open(tlogFile);
|
||||
sz = fs.getFileStatus(tlogFile).getLen();
|
||||
fis = new FSDataFastInputStream(fdis, startingPos);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -384,22 +391,23 @@ public class HdfsTransactionLog extends TransactionLog {
|
|||
if (pos >= fos.size()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
fos.flushBuffer();
|
||||
}
|
||||
|
||||
// we actually need a new reader to
|
||||
// see if any data was added by the writer
|
||||
if (fis.position() >= sz) {
|
||||
fis.close();
|
||||
tlogOutStream.hflush();
|
||||
try {
|
||||
FSDataInputStream fdis = fs.open(tlogFile);
|
||||
fis = new FSDataFastInputStream(fdis, pos);
|
||||
sz = fs.getFileStatus(tlogFile).getLen();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
if (pos >= sz) {
|
||||
log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz);
|
||||
|
||||
synchronized (HdfsTransactionLog.this) {
|
||||
fos.flushBuffer();
|
||||
sz = fos.size();
|
||||
}
|
||||
|
||||
tlogOutStream.hflush();
|
||||
fis.close();
|
||||
|
||||
FSDataInputStream fdis = fs.open(tlogFile);
|
||||
fis = new FSDataFastInputStream(fdis, pos);
|
||||
}
|
||||
|
||||
if (pos == 0) {
|
||||
|
@ -446,7 +454,7 @@ public class HdfsTransactionLog extends TransactionLog {
|
|||
|
||||
@Override
|
||||
public long currentSize() {
|
||||
return sz;
|
||||
return fos.size();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -604,5 +612,3 @@ class FSDataFastInputStream extends FastInputStream {
|
|||
return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
|||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.RTimer;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.apache.solr.util.TestInjection;
|
||||
import org.apache.solr.util.plugin.PluginInfoInitialized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -1324,7 +1325,7 @@ public class UpdateLog implements PluginInfoInitialized {
|
|||
loglog.info(
|
||||
"log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
|
||||
translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
|
||||
Math.round(cpos / (double) csize * 100.));
|
||||
Math.floor(cpos / (double) csize * 100.));
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -1439,6 +1440,7 @@ public class UpdateLog implements PluginInfoInitialized {
|
|||
loglog.warn("REPLAY_ERR: Exception replaying log", ex);
|
||||
// something wrong with the request?
|
||||
}
|
||||
TestInjection.injectUpdateLogReplayRandomPause();
|
||||
}
|
||||
|
||||
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
|
||||
|
|
|
@ -318,6 +318,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// if we are in zk mode...
|
||||
if (zkEnabled) {
|
||||
|
||||
assert TestInjection.injectUpdateRandomPause();
|
||||
|
||||
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
|
||||
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
|
||||
forwardToLeader = false;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.solr.util;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
|
||||
|
@ -31,10 +32,14 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Allows random faults to be injected in running code during test runs.
|
||||
*
|
||||
* Set static strings to "true" or "false" or "true:60" for true 60% of the time.
|
||||
*/
|
||||
public class TestInjection {
|
||||
|
||||
|
@ -46,6 +51,8 @@ public class TestInjection {
|
|||
|
||||
}
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final Pattern ENABLED_PERCENT = Pattern.compile("(true|false)(?:\\:(\\d+))?$", Pattern.CASE_INSENSITIVE);
|
||||
private static final Random RANDOM;
|
||||
|
||||
|
@ -68,15 +75,19 @@ public class TestInjection {
|
|||
|
||||
public static String nonExistentCoreExceptionAfterUnload = null;
|
||||
|
||||
public static String updateLogReplayRandomPause = null;
|
||||
|
||||
public static String updateRandomPause = null;
|
||||
|
||||
private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
|
||||
|
||||
|
||||
|
||||
public static void reset() {
|
||||
nonGracefullClose = null;
|
||||
failReplicaRequests = null;
|
||||
failUpdateRequests = null;
|
||||
nonExistentCoreExceptionAfterUnload = null;
|
||||
updateLogReplayRandomPause = null;
|
||||
updateRandomPause = null;
|
||||
|
||||
for (Timer timer : timers) {
|
||||
timer.cancel();
|
||||
|
@ -160,6 +171,44 @@ public class TestInjection {
|
|||
return true;
|
||||
}
|
||||
|
||||
public static boolean injectUpdateLogReplayRandomPause() {
|
||||
if (updateLogReplayRandomPause != null) {
|
||||
Pair<Boolean,Integer> pair = parseValue(updateLogReplayRandomPause);
|
||||
boolean enabled = pair.getKey();
|
||||
int chanceIn100 = pair.getValue();
|
||||
if (enabled && RANDOM.nextInt(100) >= (100 - chanceIn100)) {
|
||||
long rndTime = RANDOM.nextInt(1000);
|
||||
log.info("inject random log replay delay of {}ms", rndTime);
|
||||
try {
|
||||
Thread.sleep(rndTime);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public static boolean injectUpdateRandomPause() {
|
||||
if (updateRandomPause != null) {
|
||||
Pair<Boolean,Integer> pair = parseValue(updateRandomPause);
|
||||
boolean enabled = pair.getKey();
|
||||
int chanceIn100 = pair.getValue();
|
||||
if (enabled && RANDOM.nextInt(100) >= (100 - chanceIn100)) {
|
||||
long rndTime = RANDOM.nextInt(1000);
|
||||
log.info("inject random update delay of {}ms", rndTime);
|
||||
try {
|
||||
Thread.sleep(rndTime);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static Pair<Boolean,Integer> parseValue(String raw) {
|
||||
Matcher m = ENABLED_PERCENT.matcher(raw);
|
||||
if (!m.matches()) throw new RuntimeException("No match, probably bad syntax: " + raw);
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud;
|
|||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
|
@ -46,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
@Slow
|
||||
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||
@ThreadLeakLingering(linger = 60000)
|
||||
@SuppressObjectReleaseTracker(bugUrl="Testing purposes")
|
||||
public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
|
||||
private static final int FAIL_TOLERANCE = 60;
|
||||
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.util.TestInjection;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
@Slow
|
||||
@Nightly
|
||||
@SuppressSSL
|
||||
@SuppressObjectReleaseTracker(bugUrl="Testing purposes")
|
||||
public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private List<StoppableIndexingThread> threads;
|
||||
|
||||
public TlogReplayBufferedWhileIndexingTest() throws Exception {
|
||||
super();
|
||||
sliceCount = 1;
|
||||
fixShardCount(2);
|
||||
schemaString = "schema15.xml"; // we need a string id
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeRestartWhileUpdatingTest() throws Exception {
|
||||
System.setProperty("leaderVoteWait", "300000");
|
||||
System.setProperty("solr.autoCommit.maxTime", "10000");
|
||||
System.setProperty("solr.autoSoftCommit.maxTime", "3000");
|
||||
TestInjection.updateLogReplayRandomPause = "true:10";
|
||||
TestInjection.updateRandomPause = "true:10";
|
||||
if (System.getProperty("solr.hdfs.home") != null) useFactory("solr.StandardDirectoryFactory");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterRestartWhileUpdatingTest() {
|
||||
System.clearProperty("leaderVoteWait");
|
||||
System.clearProperty("solr.autoCommit.maxTime");
|
||||
System.clearProperty("solr.autoSoftCommit.maxTime");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
handle.clear();
|
||||
handle.put("timestamp", SKIPVAL);
|
||||
|
||||
waitForRecoveriesToFinish(false);
|
||||
|
||||
int numThreads = 3;
|
||||
|
||||
threads = new ArrayList<>(numThreads);
|
||||
|
||||
ArrayList<JettySolrRunner> allJetty = new ArrayList<>();
|
||||
allJetty.addAll(jettys);
|
||||
allJetty.remove(shardToLeaderJetty.get("shard1").jetty);
|
||||
assert allJetty.size() == 1 : allJetty.size();
|
||||
ChaosMonkey.stop(allJetty.get(0));
|
||||
|
||||
StoppableIndexingThread indexThread;
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
boolean pauseBetweenUpdates = random().nextBoolean();
|
||||
int batchSize = random().nextInt(4) + 1;
|
||||
indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, 900, batchSize, pauseBetweenUpdates);
|
||||
threads.add(indexThread);
|
||||
indexThread.start();
|
||||
}
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
ChaosMonkey.start(allJetty.get(0));
|
||||
|
||||
Thread.sleep(45000);
|
||||
|
||||
waitForThingsToLevelOut(440);
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
waitForRecoveriesToFinish(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), false, true);
|
||||
|
||||
for (StoppableIndexingThread thread : threads) {
|
||||
thread.safeStop();
|
||||
}
|
||||
|
||||
waitForThingsToLevelOut(30);
|
||||
|
||||
checkShardConsistency(false, false);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void indexDoc(SolrInputDocument doc) throws IOException,
|
||||
SolrServerException {
|
||||
cloudClient.add(doc);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void distribTearDown() throws Exception {
|
||||
// make sure threads have been stopped...
|
||||
if (threads != null) {
|
||||
for (StoppableIndexingThread thread : threads) {
|
||||
thread.safeStop();
|
||||
}
|
||||
|
||||
for (StoppableIndexingThread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
super.distribTearDown();
|
||||
}
|
||||
|
||||
// skip the randoms - they can deadlock...
|
||||
@Override
|
||||
protected void indexr(Object... fields) throws Exception {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
addFields(doc, fields);
|
||||
addFields(doc, "rnd_b", true);
|
||||
indexDoc(doc);
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import com.carrotsearch.randomizedtesting.annotations.Nightly;
|
|||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
|
||||
import org.apache.solr.cloud.ChaosMonkeyNothingIsSafeTest;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -32,6 +33,7 @@ import org.junit.BeforeClass;
|
|||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
||||
})
|
||||
@SuppressObjectReleaseTracker(bugUrl="Testing purposes")
|
||||
public class HdfsChaosMonkeyNothingIsSafeTest extends ChaosMonkeyNothingIsSafeTest {
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.cloud.TlogReplayBufferedWhileIndexingTest;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Nightly;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
|
||||
@Slow
|
||||
@Nightly
|
||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
||||
})
|
||||
public class HdfsTlogReplayBufferedWhileIndexingTest extends TlogReplayBufferedWhileIndexingTest {
|
||||
|
||||
public HdfsTlogReplayBufferedWhileIndexingTest() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
||||
System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClass() throws Exception {
|
||||
HdfsTestUtil.teardownClass(dfsCluster);
|
||||
dfsCluster = null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String getDataDir(String dataDir) throws IOException {
|
||||
return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue