From 57b47fed018c1e77cabd3d31a0fc7965a3badf16 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Tue, 24 Mar 2015 03:19:37 +0000 Subject: [PATCH] SOLR-7134: Replication can still cause index corruption. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1668779 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 2 + .../apache/solr/cloud/ElectionContext.java | 2 +- .../apache/solr/cloud/RecoveryStrategy.java | 11 +- .../org/apache/solr/handler/IndexFetcher.java | 107 ++++++++++++------ .../solr/handler/admin/CoreAdminHandler.java | 4 +- .../component/RealTimeGetComponent.java | 20 ++++ .../solr/update/DefaultSolrCoreState.java | 10 ++ .../java/org/apache/solr/update/PeerSync.java | 4 +- .../org/apache/solr/update/SolrCoreState.java | 3 + .../processor/DistributedUpdateProcessor.java | 2 + .../solr/cloud/ChaosMonkeySafeLeaderTest.java | 17 ++- .../org/apache/solr/cloud/RecoveryZkTest.java | 4 +- .../apache/solr/cloud/hdfs/HdfsTestUtil.java | 13 ++- .../HdfsWriteToMultipleCollectionsTest.java | 2 +- .../solr/handler/TestReplicationHandler.java | 5 +- .../solr/cloud/StopableIndexingThread.java | 46 ++++---- 16 files changed, 177 insertions(+), 75 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 92a2ebd7a11..1f2d0f2f620 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -282,6 +282,8 @@ Bug Fixes * SOLR-7286: Using HDFS's FileSystem.newInstance does not guarantee a new instance. (Mark Miller) +* SOLR-7134: Replication can still cause index corruption. (Mark Miller, shalin, Mike Drob) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 912116bc407..2c12ebb0363 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -305,7 +305,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { searchHolder.decref(); } } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, null, e); + log.error("Error in solrcloud_debug block", e); } } if (!success) { diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 7d5836385fd..1b8fafb6663 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -160,8 +160,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { boolean success = replicationHandler.doFetch(solrParams, false); if (!success) { - throw new SolrException(ErrorCode.SERVER_ERROR, - "Replication for recovery failed."); + throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed."); } // solrcloud_debug @@ -179,7 +178,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { + " from " + leaderUrl + " gen:" - + core.getDeletionPolicy().getLatestCommit().getGeneration() + + core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir() + " index:" + core.getIndexDir() + " newIndex:" + core.getNewIndexDir() @@ -189,7 +188,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { searchHolder.decref(); } } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, null, e); + log.debug("Error in solrcloud_debug block", e); } } @@ -409,7 +408,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { searchHolder.decref(); } } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, null, e); + log.debug("Error in solrcloud_debug block", e); } } @@ -557,7 +556,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { searchHolder.decref(); } } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, null, e); + log.debug("Error in solrcloud_debug block", e); } } diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index 3cda5ef61f3..cbe07df7284 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -245,25 +245,37 @@ public class IndexFetcher { } } - private boolean successfulInstall = false; - + boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException { + return fetchLatestIndex(core, forceReplication, false); + } + /** * This command downloads all the necessary files from master to install a index commit point. Only changed files are * downloaded. It also downloads the conf files (if they are modified). * * @param core the SolrCore * @param forceReplication force a replication in all cases + * @param forceCoreReload force a core reload in all cases * @return true on success, false if slave is already in sync * @throws IOException if an exception occurs */ - boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException { - successfulInstall = false; + boolean fetchLatestIndex(final SolrCore core, boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException { + boolean cleanupDone = false; + boolean successfulInstall = false; replicationStartTime = System.currentTimeMillis(); Directory tmpIndexDir = null; String tmpIndex = null; Directory indexDir = null; String indexDirPath = null; boolean deleteTmpIdxDir = true; + + if (!core.getSolrCoreState().getLastReplicateIndexSuccess()) { + // if the last replication was not a success, we force a full replication + // when we are a bit more confident we may want to try a partial replication + // if the error is connection related or something, but we have to be careful + forceReplication = true; + } + try { //get the current 'replicateable' index version in the master NamedList response = null; @@ -404,6 +416,7 @@ public class IndexFetcher { + " secs"); Collection> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload); if (!modifiedConfFiles.isEmpty()) { + reloadCore = true; downloadConfFiles(confFilesToDownload, latestGeneration); if (isFullCopyNeeded) { successfulInstall = modifyIndexProps(tmpIdxDirName); @@ -426,7 +439,6 @@ public class IndexFetcher { logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);// write to a file time of replication and // conf files. - reloadCore = true; } } else { terminateAndWaitFsyncService(); @@ -448,7 +460,8 @@ public class IndexFetcher { } // we must reload the core after we open the IW back up - if (reloadCore) { + if (successfulInstall && (reloadCore || forceCoreReload)) { + LOG.info("Reloading SolrCore {}", core.getName()); reloadCore(); } @@ -469,6 +482,16 @@ public class IndexFetcher { openNewSearcherAndUpdateCommitPoint(); } + if (!isFullCopyNeeded && !forceReplication && !successfulInstall) { + cleanup(core, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall); + cleanupDone = true; + // we try with a full copy of the index + LOG.warn( + "Replication attempt was not successful - trying a full index replication reloadCore={}", + reloadCore); + successfulInstall = fetchLatestIndex(core, true, reloadCore); + } + replicationStartTime = 0; return successfulInstall; } catch (ReplicationHandlerException e) { @@ -482,41 +505,51 @@ public class IndexFetcher { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e); } } finally { - try { - if (!successfulInstall) { - try { - logReplicationTimeAndConfFiles(null, successfulInstall); - } catch(Exception e) { - LOG.error("caught", e); - } - } - filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null; - replicationStartTime = 0; - dirFileFetcher = null; - localFileFetcher = null; - if (fsyncService != null && !fsyncService.isShutdown()) fsyncService - .shutdownNow(); - fsyncService = null; - stop = false; - fsyncException = null; - } finally { - if (deleteTmpIdxDir && tmpIndexDir != null) { - try { - core.getDirectoryFactory().doneWithDirectory(tmpIndexDir); - core.getDirectoryFactory().remove(tmpIndexDir); - } catch (IOException e) { - SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e); - } - } + if (!cleanupDone) { + cleanup(core, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall); + } + } + } - if (tmpIndexDir != null) { - core.getDirectoryFactory().release(tmpIndexDir); + private void cleanup(final SolrCore core, Directory tmpIndexDir, + Directory indexDir, boolean deleteTmpIdxDir, boolean successfulInstall) throws IOException { + try { + if (!successfulInstall) { + try { + logReplicationTimeAndConfFiles(null, successfulInstall); + } catch (Exception e) { + LOG.error("caught", e); } - - if (indexDir != null) { - core.getDirectoryFactory().release(indexDir); + } + + core.getUpdateHandler().getSolrCoreState().setLastReplicateIndexSuccess(successfulInstall); + + filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null; + replicationStartTime = 0; + dirFileFetcher = null; + localFileFetcher = null; + if (fsyncService != null && !fsyncService.isShutdown()) fsyncService + .shutdownNow(); + fsyncService = null; + stop = false; + fsyncException = null; + } finally { + if (deleteTmpIdxDir && tmpIndexDir != null) { + try { + core.getDirectoryFactory().doneWithDirectory(tmpIndexDir); + core.getDirectoryFactory().remove(tmpIndexDir); + } catch (IOException e) { + SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e); } } + + if (tmpIndexDir != null) { + core.getDirectoryFactory().release(tmpIndexDir); + } + + if (indexDir != null) { + core.getDirectoryFactory().release(indexDir); + } } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index f32901a6232..30b03564391 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -832,7 +832,7 @@ public class CoreAdminHandler extends RequestHandlerBase { searchHolder.decref(); } } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, null, e); + log.debug("Error in solrcloud_debug block", e); } } if (!success) { @@ -1011,7 +1011,7 @@ public class CoreAdminHandler extends RequestHandlerBase { searchHolder.decref(); } } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, null, e); + log.debug("Error in solrcloud_debug block", e); } } } diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index a8ae0896676..6757f3e732e 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -29,6 +29,7 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.StorableField; import org.apache.lucene.index.StoredDocument; import org.apache.lucene.index.Term; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.solr.client.solrj.SolrResponse; @@ -98,6 +99,25 @@ public class RealTimeGetComponent extends SearchComponent val = params.get("getUpdates"); if (val != null) { + // solrcloud_debug + if (log.isDebugEnabled()) { + try { + RefCounted searchHolder = req.getCore() + .getNewestSearcher(false); + SolrIndexSearcher searcher = searchHolder.get(); + try { + log.debug(req.getCore().getCoreDescriptor() + .getCoreContainer().getZkController().getNodeName() + + " min count to sync to (from most recent searcher view) " + + searcher.search(new MatchAllDocsQuery(), 1).totalHits); + } finally { + searchHolder.decref(); + } + } catch (Exception e) { + log.debug("Error in solrcloud_debug block", e); + } + } + processGetUpdates(rb); return; } diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java index 0e6ce14dc34..d38fc6b3ad8 100644 --- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java @@ -53,6 +53,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover private volatile boolean recoveryRunning; private RecoveryStrategy recoveryStrat; + private volatile boolean lastReplicationSuccess = true; private RefCounted refCntWriter; @@ -377,5 +378,14 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover return leaderThrottle; } + @Override + public boolean getLastReplicateIndexSuccess() { + return lastReplicationSuccess; + } + + @Override + public void setLastReplicateIndexSuccess(boolean success) { + this.lastReplicationSuccess = success; + } } diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index 77b26a9fb40..1b3bf96353b 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -495,7 +495,7 @@ public class PeerSync { cmd.setVersion(version); cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT); if (debug) { - log.debug(msg() + "add " + cmd); + log.debug(msg() + "add " + cmd + " id " + sdoc.getField("id")); } proc.processAdd(cmd); break; @@ -508,7 +508,7 @@ public class PeerSync { cmd.setVersion(version); cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT); if (debug) { - log.debug(msg() + "delete " + cmd); + log.debug(msg() + "delete " + cmd + " " + new BytesRef(idBytes).utf8ToString()); } proc.processDelete(cmd); break; diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java index e7467a0ea06..b14b5ac483a 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java @@ -146,4 +146,7 @@ public abstract class SolrCoreState { */ public abstract ActionThrottle getLeaderThrottle(); + public abstract boolean getLastReplicateIndexSuccess(); + + public abstract void setLastReplicateIndexSuccess(boolean success); } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 3c201ad9152..b47c4ace655 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -1081,6 +1081,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { // This update is a repeat, or was reordered. We need to drop this update. + log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); return true; } @@ -1568,6 +1569,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { // This update is a repeat, or was reordered. We need to drop this update. + log.debug("Dropping delete update due to version {}", idBytes.utf8ToString()); return true; } } diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java index b7d815cb297..d9e835709b6 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java @@ -112,8 +112,21 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase { List threads = new ArrayList<>(); int threadCount = 2; + int batchSize = 1; + if (random().nextBoolean()) { + batchSize = random().nextInt(98) + 2; + } + + boolean pauseBetweenUpdates = TEST_NIGHTLY ? random().nextBoolean() : true; + int maxUpdates = -1; + if (!pauseBetweenUpdates) { + maxUpdates = 1000 + random().nextInt(1000); + } else { + maxUpdates = 15000; + } + for (int i = 0; i < threadCount; i++) { - StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true); + StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1 threads.add(indexThread); indexThread.start(); } @@ -158,7 +171,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase { waitForThingsToLevelOut(180000); - checkShardConsistency(true, true); + checkShardConsistency(batchSize == 1, true); if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n"); diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java index 9966178138a..f828b933d37 100644 --- a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java @@ -72,10 +72,10 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase { maxDoc = maxDocNightlyList[random().nextInt(maxDocList.length - 1)]; } - indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc); + indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true); indexThread.start(); - indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc); + indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc, 1, true); indexThread2.start(); diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java index 5f02a6876ca..69e035f4b63 100644 --- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java @@ -18,6 +18,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.lucene.util.LuceneTestCase; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * Licensed to the Apache Software Foundation (ASF) under one or more @@ -37,6 +39,7 @@ import org.apache.solr.common.util.IOUtils; */ public class HdfsTestUtil { + private static Logger log = LoggerFactory.getLogger(HdfsTestUtil.class); private static Locale savedLocale; @@ -131,7 +134,15 @@ public class HdfsTestUtil { if (timer != null) { timer.cancel(); } - dfsCluster.shutdown(); + try { + dfsCluster.shutdown(); + } catch (Error e) { + // Added in SOLR-7134 + // Rarely, this can fail to either a NullPointerException + // or a class not found exception. The later may fixable + // by adding test dependencies. + log.warn("Exception shutting down dfsCluster", e); + } } // TODO: we HACK around HADOOP-9643 diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java index ba198cee107..45946e4887e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java @@ -105,7 +105,7 @@ public class HdfsWriteToMultipleCollectionsTest extends BasicDistributedZkTest { CloudSolrClient client = new CloudSolrClient(zkServer.getZkAddress()); client.setDefaultCollection(ACOLLECTION + i); cloudClients.add(client); - StopableIndexingThread indexThread = new StopableIndexingThread(null, client, "1", true, docCount); + StopableIndexingThread indexThread = new StopableIndexingThread(null, client, "1", true, docCount, 1, true); threads.add(indexThread); indexThread.start(); } diff --git a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java index c0b80c03b38..feed24acbe1 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java +++ b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java @@ -301,8 +301,9 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { details.get("slave")); // SOLR-2677: assert not false negatives Object timesFailed = ((NamedList)details.get("slave")).get(IndexFetcher.TIMES_FAILED); - assertEquals("slave has fetch error count", - null, timesFailed); + // SOLR-7134: we can have a fail because some mock index files have no checksum, will + // always be downloaded, and may not be able to be moved into the existing index + assertTrue("slave has fetch error count: " + (String)timesFailed, timesFailed == null || ((String) timesFailed).equals("1")); if (3 != i) { // index & fetch diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java b/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java index 3f04d7050a0..f6832112477 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java @@ -42,18 +42,23 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab private SolrClient cloudClient; private int numDeletes; private int numAdds; - + private List docs = new ArrayList(); + private int batchSize; + private boolean pauseBetweenUpdates; + public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes) { - this(controlClient, cloudClient, id, doDeletes, -1); + this(controlClient, cloudClient, id, doDeletes, -1, 1, true); } - public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles) { + public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles, int batchSize, boolean pauseBetweenUpdates) { super("StopableIndexingThread"); this.controlClient = controlClient; this.cloudClient = cloudClient; this.id = id; this.doDeletes = doDeletes; this.numCycles = numCycles; + this.batchSize = batchSize; + this.pauseBetweenUpdates = pauseBetweenUpdates; setDaemon(true); } @@ -100,8 +105,17 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab try { numAdds++; - indexr("id", id, i1, 50, t1, + SolrInputDocument doc = new SolrInputDocument(); + addFields(doc, "id", id, i1, 50, t1, "to come to the aid of their country."); + addFields(doc, "rnd_b", true); + + docs.add(doc); + + if (docs.size() >= batchSize) { + indexDocs(docs); + docs.clear(); + } } catch (Exception e) { addFailed = true; System.err.println("REQUEST FAILED for id=" + id); @@ -117,10 +131,12 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab deletes.add(id); } - try { - Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(100)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (docs.size() > 0 && pauseBetweenUpdates) { + try { + Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(500) + 50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } @@ -151,26 +167,18 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab } } - protected void indexr(Object... fields) throws Exception { - SolrInputDocument doc = new SolrInputDocument(); - addFields(doc, fields); - addFields(doc, "rnd_b", true); - indexDoc(doc); - } - - protected void indexDoc(SolrInputDocument doc) throws IOException, + protected void indexDocs(List docs) throws IOException, SolrServerException { if (controlClient != null) { UpdateRequest req = new UpdateRequest(); - req.add(doc); + req.add(docs); req.setParam("CONTROL", "TRUE"); req.process(controlClient); } - UpdateRequest ureq = new UpdateRequest(); - ureq.add(doc); + ureq.add(docs); ureq.process(cloudClient); }