mirror of https://github.com/apache/lucene.git
SOLR-4531: Add tests to ensure that recovery does not fail on corrupted tlogs
This commit is contained in:
parent
ce57e8a8f4
commit
b7aa582dff
|
@ -341,6 +341,9 @@ Other Changes
|
|||
Last JVM garbage collection log solr_gc.log is moved into $SOLR_LOGS_DIR/archived/
|
||||
(janhoy)
|
||||
|
||||
* SOLR-4531: Add tests to ensure that recovery does not fail on corrupted tlogs.
|
||||
(Simon Scofield, Cao Manh Dat via shalin)
|
||||
|
||||
================== 6.2.1 ==================
|
||||
|
||||
Bug Fixes
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestCloudRecovery extends SolrCloudTestCase {
|
||||
|
||||
private static final String COLLECTION = "collection1";
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
|
||||
|
||||
configureCluster(2)
|
||||
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
|
||||
.configure();
|
||||
|
||||
CollectionAdminRequest
|
||||
.createCollection(COLLECTION, "config", 2, 2)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
|
||||
false, true, 30);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void resetCollection() throws IOException, SolrServerException {
|
||||
cluster.getSolrClient().deleteByQuery(COLLECTION, "*:*");
|
||||
cluster.getSolrClient().commit(COLLECTION);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void leaderRecoverFromLogOnStartupTest() throws Exception {
|
||||
AtomicInteger countReplayLog = new AtomicInteger(0);
|
||||
DirectUpdateHandler2.commitOnClose = false;
|
||||
UpdateLog.testing_logReplayFinishHook = countReplayLog::incrementAndGet;
|
||||
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
cloudClient.add(COLLECTION, sdoc("id", "1"));
|
||||
cloudClient.add(COLLECTION, sdoc("id", "2"));
|
||||
cloudClient.add(COLLECTION, sdoc("id", "3"));
|
||||
cloudClient.add(COLLECTION, sdoc("id", "4"));
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("q", "*:*");
|
||||
QueryResponse resp = cloudClient.query(COLLECTION, params);
|
||||
assertEquals(0, resp.getResults().getNumFound());
|
||||
|
||||
ChaosMonkey.stop(cluster.getJettySolrRunners());
|
||||
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
|
||||
ChaosMonkey.start(cluster.getJettySolrRunners());
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), COLLECTION, 120000));
|
||||
|
||||
resp = cloudClient.query(COLLECTION, params);
|
||||
assertEquals(4, resp.getResults().getNumFound());
|
||||
// Make sure all nodes is recover from tlog
|
||||
assertEquals(4, countReplayLog.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void corruptedLogTest() throws Exception {
|
||||
AtomicInteger countReplayLog = new AtomicInteger(0);
|
||||
DirectUpdateHandler2.commitOnClose = false;
|
||||
UpdateLog.testing_logReplayFinishHook = countReplayLog::incrementAndGet;
|
||||
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
cloudClient.add(COLLECTION, sdoc("id", "1000"));
|
||||
cloudClient.add(COLLECTION, sdoc("id", "1001"));
|
||||
for (int i = 0; i < 10; i++) {
|
||||
cloudClient.add(COLLECTION, sdoc("id", String.valueOf(i)));
|
||||
}
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("q", "*:*");
|
||||
QueryResponse resp = cloudClient.query(COLLECTION, params);
|
||||
assertEquals(0, resp.getResults().getNumFound());
|
||||
|
||||
int logHeaderSize = Integer.MAX_VALUE;
|
||||
Map<File, byte[]> contentFiles = new HashMap<>();
|
||||
for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
|
||||
for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
|
||||
File tlogFolder = new File(solrCore.getUlogDir(), UpdateLog.TLOG_NAME);
|
||||
String[] tLogFiles = tlogFolder.list();
|
||||
Arrays.sort(tLogFiles);
|
||||
File lastTLogFile = new File(tlogFolder.getAbsolutePath() + "/" + tLogFiles[tLogFiles.length - 1]);
|
||||
byte[] tlogBytes = IOUtils.toByteArray(new FileInputStream(lastTLogFile));
|
||||
contentFiles.put(lastTLogFile, tlogBytes);
|
||||
logHeaderSize = Math.min(tlogBytes.length, logHeaderSize);
|
||||
}
|
||||
}
|
||||
|
||||
ChaosMonkey.stop(cluster.getJettySolrRunners());
|
||||
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
|
||||
|
||||
for (Map.Entry<File, byte[]> entry : contentFiles.entrySet()) {
|
||||
byte[] tlogBytes = entry.getValue();
|
||||
|
||||
if (tlogBytes.length <= logHeaderSize) continue;
|
||||
FileOutputStream stream = new FileOutputStream(entry.getKey());
|
||||
int skipLastBytes = Math.max(random().nextInt(tlogBytes.length - logHeaderSize), 2);
|
||||
for (int i = 0; i < entry.getValue().length - skipLastBytes; i++) {
|
||||
stream.write(tlogBytes[i]);
|
||||
}
|
||||
stream.close();
|
||||
}
|
||||
|
||||
ChaosMonkey.start(cluster.getJettySolrRunners());
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), COLLECTION, 120000));
|
||||
|
||||
resp = cloudClient.query(COLLECTION, params);
|
||||
// Make sure cluster still healthy
|
||||
assertTrue(resp.getResults().getNumFound() >= 2);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestLeaderRecoverFromLogOnStartup extends AbstractFullDistribZkTestBase {
|
||||
@Override
|
||||
public void distribSetUp() throws Exception {
|
||||
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
|
||||
super.distribSetUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
public void test() throws Exception {
|
||||
AtomicInteger countReplayLog = new AtomicInteger(0);
|
||||
DirectUpdateHandler2.commitOnClose = false;
|
||||
UpdateLog.testing_logReplayFinishHook = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
countReplayLog.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
String testCollectionName = "testCollection";
|
||||
createCollection(testCollectionName, 2, 2, 1);
|
||||
waitForRecoveriesToFinish(false);
|
||||
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
cloudClient.add(sdoc("id", "1"));
|
||||
cloudClient.add(sdoc("id", "2"));
|
||||
cloudClient.add(sdoc("id", "3"));
|
||||
cloudClient.add(sdoc("id", "4"));
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("q", "*:*");
|
||||
QueryResponse resp = cloudClient.query(params);
|
||||
assertEquals(0, resp.getResults().getNumFound());
|
||||
|
||||
ChaosMonkey.stop(jettys);
|
||||
ChaosMonkey.stop(controlJetty);
|
||||
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
|
||||
ChaosMonkey.start(jettys);
|
||||
ChaosMonkey.start(controlJetty);
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), testCollectionName, 120000));
|
||||
|
||||
cloudClient.commit();
|
||||
resp = cloudClient.query(params);
|
||||
assertEquals(4, resp.getResults().getNumFound());
|
||||
// Make sure all nodes is recover from tlog
|
||||
assertEquals(4, countReplayLog.get());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue