diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 09f0062352d..fd907f8e512 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -77,18 +77,6 @@ - - - - - - - - - - - - @@ -175,30 +163,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index db8d1ee29be..e358bc57798 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -57,25 +57,37 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos public static final Setting INDEX_BUFFER_SIZE_SETTING = Setting.memorySizeSetting("indices.memory.index_buffer_size", "10%", Property.NodeScope); - /** Only applies when indices.memory.index_buffer_size is a %, to set a floor on the actual size in bytes (default: 48 MB). */ - public static final Setting MIN_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.min_index_buffer_size", - new ByteSizeValue(48, ByteSizeUnit.MB), - new ByteSizeValue(0, ByteSizeUnit.BYTES), - new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), - Property.NodeScope); + /** Only applies when indices.memory.index_buffer_size is a %, + * to set a floor on the actual size in bytes (default: 48 MB). */ + public static final Setting MIN_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting( + "indices.memory.min_index_buffer_size", + new ByteSizeValue(48, ByteSizeUnit.MB), + new ByteSizeValue(0, ByteSizeUnit.BYTES), + new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), + Property.NodeScope); - /** Only applies when indices.memory.index_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ - public static final Setting MAX_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.max_index_buffer_size", - new ByteSizeValue(-1), - new ByteSizeValue(-1), - new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), - Property.NodeScope); + /** Only applies when indices.memory.index_buffer_size is a %, + * to set a ceiling on the actual size in bytes (default: not set). */ + public static final Setting MAX_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting( + "indices.memory.max_index_buffer_size", + new ByteSizeValue(-1), + new ByteSizeValue(-1), + new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), + Property.NodeScope); - /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */ - public static final Setting SHARD_INACTIVE_TIME_SETTING = Setting.positiveTimeSetting("indices.memory.shard_inactive_time", TimeValue.timeValueMinutes(5), Property.NodeScope); + /** If we see no indexing operations after this much time for a given shard, + * we consider that shard inactive (default: 5 minutes). */ + public static final Setting SHARD_INACTIVE_TIME_SETTING = Setting.positiveTimeSetting( + "indices.memory.shard_inactive_time", + TimeValue.timeValueMinutes(5), + Property.NodeScope + ); /** How frequently we check indexing memory usage (default: 5 seconds). */ - public static final Setting SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting.positiveTimeSetting("indices.memory.interval", TimeValue.timeValueSeconds(5), Property.NodeScope); + public static final Setting SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting.positiveTimeSetting( + "indices.memory.interval", + TimeValue.timeValueSeconds(5), + Property.NodeScope); private final ThreadPool threadPool; @@ -251,10 +263,11 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos totalBytes = bytesWrittenSinceCheck.get(); if (totalBytes > indexingBuffer.getBytes()/30) { bytesWrittenSinceCheck.addAndGet(-totalBytes); - // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is - // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against - // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes - // processed by indexing: + // NOTE: this is only an approximate check, because bytes written is to the translog, + // vs indexing memory buffer which is typically smaller but can be larger in extreme + // cases (many unique terms). This logic is here only as a safety against thread + // starvation or too infrequent checking, to ensure we are still checking periodically, + // in proportion to bytes processed by indexing: runUnlocked(); } } finally { @@ -313,7 +326,8 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos if (logger.isTraceEnabled()) { logger.trace("total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]", - new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING.getKey(), indexingBuffer, new ByteSizeValue(totalBytesWriting)); + new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING.getKey(), indexingBuffer, + new ByteSizeValue(totalBytesWriting)); } // If we are using more than 50% of our budget across both indexing buffer and bytes we are still moving to disk, then we now @@ -343,7 +357,8 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos if (shardBytesUsed > 0) { if (logger.isTraceEnabled()) { if (shardWritingBytes != 0) { - logger.trace("shard [{}] is using [{}] heap, writing [{}] heap", shard.shardId(), shardBytesUsed, shardWritingBytes); + logger.trace("shard [{}] is using [{}] heap, writing [{}] heap", shard.shardId(), shardBytesUsed, + shardWritingBytes); } else { logger.trace("shard [{}] is using [{}] heap, not writing any bytes", shard.shardId(), shardBytesUsed); } @@ -352,12 +367,14 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos } } - logger.debug("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}], [{}] shards with non-zero indexing buffer", - new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING.getKey(), indexingBuffer, new ByteSizeValue(totalBytesWriting), queue.size()); + logger.debug("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], " + + "currently writing bytes [{}], [{}] shards with non-zero indexing buffer", new ByteSizeValue(totalBytesUsed), + INDEX_BUFFER_SIZE_SETTING.getKey(), indexingBuffer, new ByteSizeValue(totalBytesWriting), queue.size()); while (totalBytesUsed > indexingBuffer.getBytes() && queue.isEmpty() == false) { ShardAndBytesUsed largest = queue.poll(); - logger.debug("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed)); + logger.debug("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer", + largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed)); writeIndexingBufferAsync(largest.shard); totalBytesUsed -= largest.bytesUsed; if (doThrottle && throttled.contains(largest.shard) == false) { diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 07f50dc30fa..ae108da7534 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -242,7 +242,8 @@ public class IndicesService extends AbstractLifecycleComponent this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { @Override public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, long sizeInBytes) { - assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]"; + assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or " + + "equal to 0 and not [" + sizeInBytes + "]"; circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes); } }); @@ -263,7 +264,8 @@ public class IndicesService extends AbstractLifecycleComponent @Override protected void doStop() { - ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown")); + ExecutorService indicesStopExecutor = + Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown")); // Copy indices because we modify it asynchronously in the body of the loop final Set indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet()); @@ -290,7 +292,13 @@ public class IndicesService extends AbstractLifecycleComponent @Override protected void doClose() { - IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, cacheCleaner, indicesRequestCache, indicesQueryCache); + IOUtils.closeWhileHandlingException( + analysisRegistry, + indexingMemoryController, + indicesFieldDataCache, + cacheCleaner, + indicesRequestCache, + indicesQueryCache); } /** @@ -426,7 +434,8 @@ public class IndicesService extends AbstractLifecycleComponent if (indexService == null) { throw new IndexNotFoundException(index); } - assert indexService.indexUUID().equals(index.getUUID()) : "uuid mismatch local: " + indexService.indexUUID() + " incoming: " + index.getUUID(); + assert indexService.indexUUID().equals(index.getUUID()) : "uuid mismatch local: " + indexService.indexUUID() + + " incoming: " + index.getUUID(); return indexService; } @@ -564,7 +573,8 @@ public class IndicesService extends AbstractLifecycleComponent /** * This method verifies that the given {@code metaData} holds sane values to create an {@link IndexService}. - * This method tries to update the meta data of the created {@link IndexService} if the given {@code metaDataUpdate} is different from the given {@code metaData}. + * This method tries to update the meta data of the created {@link IndexService} if the given {@code metaDataUpdate} + * is different from the given {@code metaData}. * This method will throw an exception if the creation or the update fails. * The created {@link IndexService} will not be registered and will be closed immediately. */ @@ -700,7 +710,8 @@ public class IndicesService extends AbstractLifecycleComponent } deleteIndexStore(reason, metaData, clusterState); } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete unassigned index (reason [{}])", metaData.getIndex(), reason), e); + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete unassigned index (reason [{}])", + metaData.getIndex(), reason), e); } } } @@ -717,7 +728,8 @@ public class IndicesService extends AbstractLifecycleComponent Index index = metaData.getIndex(); if (hasIndex(index)) { String localUUid = indexService(index).indexUUID(); - throw new IllegalStateException("Can't delete index store for [" + index.getName() + "] - it's still part of the indices service [" + localUUid + "] [" + metaData.getIndexUUID() + "]"); + throw new IllegalStateException("Can't delete index store for [" + index.getName() + + "] - it's still part of the indices service [" + localUUid + "] [" + metaData.getIndexUUID() + "]"); } if (clusterState.metaData().hasIndex(index.getName()) && (clusterState.nodes().getLocalNode().isMasterNode() == true)) { @@ -752,7 +764,8 @@ public class IndicesService extends AbstractLifecycleComponent } success = true; } catch (LockObtainFailedException ex) { - logger.debug(() -> new ParameterizedMessage("{} failed to delete index store - at least one shards is still locked", index), ex); + logger.debug(() -> new ParameterizedMessage("{} failed to delete index store - at least one shards is still locked", index), + ex); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("{} failed to delete index", index), ex); } finally { @@ -784,7 +797,8 @@ public class IndicesService extends AbstractLifecycleComponent * is prevented by {@link #canDeleteShardContent(ShardId, IndexSettings)} * of if the shards lock can not be acquired. * - * On data nodes, if the deleted shard is the last shard folder in its index, the method will attempt to remove the index folder as well. + * On data nodes, if the deleted shard is the last shard folder in its index, the method will attempt to remove + * the index folder as well. * * @param reason the reason for the shard deletion * @param shardId the shards ID to delete @@ -803,7 +817,8 @@ public class IndicesService extends AbstractLifecycleComponent nodeEnv.deleteShardDirectorySafe(shardId, indexSettings); logger.debug("{} deleted shard reason [{}]", shardId, reason); - if (clusterState.nodes().getLocalNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards.. + // master nodes keep the index meta data, even if having no shards.. + if (clusterState.nodes().getLocalNode().isMasterNode() == false && canDeleteIndexContents(shardId.getIndex(), indexSettings)) { if (nodeEnv.findAllShardIds(shardId.getIndex()).isEmpty()) { try { @@ -857,7 +872,8 @@ public class IndicesService extends AbstractLifecycleComponent try { metaData = metaStateService.loadIndexState(index); } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to load state file from a stale deleted index, folders will be left on disk", index), e); + logger.warn(() -> new ParameterizedMessage("[{}] failed to load state file from a stale deleted index, " + + "folders will be left on disk", index), e); return null; } final IndexSettings indexSettings = buildIndexSettings(metaData); @@ -1089,8 +1105,9 @@ public class IndicesService extends AbstractLifecycleComponent } /** - * Checks if all pending deletes have completed. Used by tests to ensure we don't check directory contents while deletion still ongoing. - * The reason is that, on Windows, browsing the directory contents can interfere with the deletion process and delay it unnecessarily. + * Checks if all pending deletes have completed. Used by tests to ensure we don't check directory contents + * while deletion still ongoing. * The reason is that, on Windows, browsing the directory contents can interfere + * with the deletion process and delay it unnecessarily. */ public boolean hasUncompletedPendingDeletes() { return numUncompletedDeletes.get() > 0; @@ -1115,7 +1132,11 @@ public class IndicesService extends AbstractLifecycleComponent private final AtomicBoolean closed = new AtomicBoolean(false); private final IndicesRequestCache requestCache; - CacheCleaner(IndicesFieldDataCache cache, IndicesRequestCache requestCache, Logger logger, ThreadPool threadPool, TimeValue interval) { + CacheCleaner(IndicesFieldDataCache cache, + IndicesRequestCache requestCache, + Logger logger, + ThreadPool threadPool, + TimeValue interval) { this.cache = cache; this.requestCache = requestCache; this.logger = logger; @@ -1135,7 +1156,8 @@ public class IndicesService extends AbstractLifecycleComponent logger.warn("Exception during periodic field data cache cleanup:", e); } if (logger.isTraceEnabled()) { - logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)); + logger.trace("periodic field data cache cleanup finished in {} milliseconds", + TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)); } try { diff --git a/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java b/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java index 22b5a8ffaf4..38ae484a42b 100644 --- a/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java +++ b/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java @@ -33,7 +33,8 @@ public class PreBuiltCacheFactory { * * ONE Exactly one version is stored. Useful for analyzers which do not store version information * LUCENE Exactly one version for each lucene version is stored. Useful to prevent different analyzers with the same version - * ELASTICSEARCH Exactly one version per elasticsearch version is stored. Useful if you change an analyzer between elasticsearch releases, when the lucene version does not change + * ELASTICSEARCH Exactly one version per elasticsearch version is stored. Useful if you change an analyzer between elasticsearch + * releases, when the lucene version does not change */ public enum CachingStrategy { ONE, LUCENE, ELASTICSEARCH }; diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index b87de10657b..5aa6f2f39a6 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -146,11 +146,16 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { registerBreaker(this.inFlightRequestsSettings); registerBreaker(this.accountingSettings); - clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit); - clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setFieldDataBreakerLimit); - clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit); - clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setRequestBreakerLimit); - clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setAccountingBreakerLimit); + clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, + this::validateTotalCircuitBreakerLimit); + clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + this::setFieldDataBreakerLimit); + clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, + IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit); + clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, + this::setRequestBreakerLimit); + clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING, + this::setAccountingBreakerLimit); } private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newRequestOverhead) { @@ -162,16 +167,19 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { } private void setInFlightRequestsBreakerLimit(ByteSizeValue newInFlightRequestsMax, Double newInFlightRequestsOverhead) { - BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, newInFlightRequestsMax.getBytes(), - newInFlightRequestsOverhead, this.inFlightRequestsSettings.getType(), this.inFlightRequestsSettings.getDurability()); + BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, + newInFlightRequestsMax.getBytes(), newInFlightRequestsOverhead, this.inFlightRequestsSettings.getType(), + this.inFlightRequestsSettings.getDurability()); registerBreaker(newInFlightRequestsSettings); this.inFlightRequestsSettings = newInFlightRequestsSettings; logger.info("Updated breaker settings for in-flight requests: {}", newInFlightRequestsSettings); } private void setFieldDataBreakerLimit(ByteSizeValue newFielddataMax, Double newFielddataOverhead) { - long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.getBytes(); - newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead; + long newFielddataLimitBytes = newFielddataMax == null ? + HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.getBytes(); + newFielddataOverhead = newFielddataOverhead == null ? + HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead; BreakerSettings newFielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead, this.fielddataSettings.getType(), this.fielddataSettings.getDurability()); registerBreaker(newFielddataSettings); @@ -181,20 +189,23 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { private void setAccountingBreakerLimit(ByteSizeValue newAccountingMax, Double newAccountingOverhead) { BreakerSettings newAccountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING, newAccountingMax.getBytes(), - newAccountingOverhead, HierarchyCircuitBreakerService.this.accountingSettings.getType(), this.accountingSettings.getDurability()); + newAccountingOverhead, HierarchyCircuitBreakerService.this.accountingSettings.getType(), + this.accountingSettings.getDurability()); registerBreaker(newAccountingSettings); HierarchyCircuitBreakerService.this.accountingSettings = newAccountingSettings; logger.info("Updated breaker settings for accounting requests: {}", newAccountingSettings); } private boolean validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) { - BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0, CircuitBreaker.Type.PARENT, null); + BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0, + CircuitBreaker.Type.PARENT, null); validateSettings(new BreakerSettings[]{newParentSettings}); return true; } private void setTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) { - BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0, CircuitBreaker.Type.PARENT, null); + BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0, + CircuitBreaker.Type.PARENT, null); this.parentSettings = newParentSettings; } @@ -236,7 +247,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { @Override public CircuitBreakerStats stats(String name) { CircuitBreaker breaker = this.breakers.get(name); - return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount()); + return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), + breaker.getTrippedCount()); } private static class MemoryUsage { diff --git a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java index 04564943ca7..a2d9d594326 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java @@ -63,7 +63,10 @@ public class ShardsSyncedFlushResult implements Streamable { /** * success constructor */ - public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, Map shardResponses) { + public ShardsSyncedFlushResult(ShardId shardId, + String syncId, + int totalShards, + Map shardResponses) { this.failureReason = null; this.shardResponses = unmodifiableMap(new HashMap<>(shardResponses)); this.syncId = syncId; diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index d23e0db84d0..e1cd85faaef 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -86,14 +86,20 @@ public class SyncedFlushService implements IndexEventListener { private final IndexNameExpressionResolver indexNameExpressionResolver; @Inject - public SyncedFlushService(IndicesService indicesService, ClusterService clusterService, TransportService transportService, IndexNameExpressionResolver indexNameExpressionResolver) { + public SyncedFlushService(IndicesService indicesService, + ClusterService clusterService, + TransportService transportService, + IndexNameExpressionResolver indexNameExpressionResolver) { this.indicesService = indicesService; this.clusterService = clusterService; this.transportService = transportService; this.indexNameExpressionResolver = indexNameExpressionResolver; - transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler()); - transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler()); - transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME, new InFlightOpCountTransportHandler()); + transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, + new PreSyncedFlushTransportHandler()); + transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, + new SyncedFlushTransportHandler()); + transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME, + new InFlightOpCountTransportHandler()); } @Override @@ -103,7 +109,8 @@ public class SyncedFlushService implements IndexEventListener { attemptSyncedFlush(indexShard.shardId(), new ActionListener() { @Override public void onResponse(ShardsSyncedFlushResult syncedFlushResult) { - logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId()); + logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", + syncedFlushResult.getShardId(), syncedFlushResult.syncId()); } @Override @@ -115,10 +122,13 @@ public class SyncedFlushService implements IndexEventListener { } /** - * a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)} + * a utility method to perform a synced flush for all shards of multiple indices. + * see {@link #attemptSyncedFlush(ShardId, ActionListener)} * for more details. */ - public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener listener) { + public void attemptSyncedFlush(final String[] aliasesOrIndices, + IndicesOptions indicesOptions, + final ActionListener listener) { final ClusterState state = clusterService.state(); final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices); final Map> results = ConcurrentCollections.newConcurrentMap(); @@ -176,12 +186,14 @@ public class SyncedFlushService implements IndexEventListener { * a) the shard has no uncommitted changes since the last flush * b) the last flush was the one executed in 1 (use the collected commit id to verify this) * - * This alone is not enough to ensure that all copies contain the same documents. Without step 2 a sync id would be written for inconsistent copies in the following scenario: + * This alone is not enough to ensure that all copies contain the same documents. + * Without step 2 a sync id would be written for inconsistent copies in the following scenario: * - * Write operation has completed on a primary and is being sent to replicas. The write request does not reach the replicas until sync flush is finished. + * Write operation has completed on a primary and is being sent to replicas. The write request does not reach the + * replicas until sync flush is finished. * Step 1 is executed. After the flush the commit points on primary contains a write operation that the replica does not have. - * Step 3 will be executed on primary and replica as well because there are no uncommitted changes on primary (the first flush committed them) and there are no uncommitted - * changes on the replica (the write operation has not reached the replica yet). + * Step 3 will be executed on primary and replica as well because there are no uncommitted changes on primary (the first flush + * committed them) and there are no uncommitted changes on the replica (the write operation has not reached the replica yet). * * Step 2 detects this scenario and fails the whole synced flush if a write operation is ongoing on the primary. * Together with the conditions for step 3 (same commit id and no uncommitted changes) this guarantees that a snc id will only @@ -194,7 +206,9 @@ public class SyncedFlushService implements IndexEventListener { innerAttemptSyncedFlush(shardId, clusterService.state(), actionListener); } - private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState state, final ActionListener actionListener) { + private void innerAttemptSyncedFlush(final ShardId shardId, + final ClusterState state, + final ActionListener actionListener) { try { final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); @@ -205,11 +219,13 @@ public class SyncedFlushService implements IndexEventListener { return; } - final ActionListener> presyncListener = new ActionListener>() { + final ActionListener> presyncListener = + new ActionListener>() { @Override public void onResponse(final Map presyncResponses) { if (presyncResponses.isEmpty()) { - actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync")); + actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, + "all shards failed to commit on pre-sync")); return; } final ActionListener inflightOpsListener = new ActionListener() { @@ -218,14 +234,17 @@ public class SyncedFlushService implements IndexEventListener { final int inflight = response.opCount(); assert inflight >= 0; if (inflight != 0) { - actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary")); + actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + + "] ongoing operations on primary")); } else { // 3. now send the sync request to all the shards; final String sharedSyncId = sharedExistingSyncId(presyncResponses); if (sharedSyncId != null) { assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) : - "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]"; - reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener); + "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + + presyncResponses + "]"; + reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, + presyncResponses, actionListener); }else { String syncId = UUIDs.randomBase64UUID(); sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener); @@ -271,8 +290,12 @@ public class SyncedFlushService implements IndexEventListener { return existingSyncId; } - private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List shards, int totalShards, - Map preSyncResponses, ActionListener listener) { + private void reportSuccessWithExistingSyncId(ShardId shardId, + String existingSyncId, + List shards, + int totalShards, + Map preSyncResponses, + ActionListener listener) { final Map results = new HashMap<>(); for (final ShardRouting shard : shards) { if (preSyncResponses.containsKey(shard.currentNodeId())) { @@ -301,7 +324,10 @@ public class SyncedFlushService implements IndexEventListener { /** * returns the number of in flight operations on primary. -1 upon error. */ - protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener listener) { + protected void getInflightOpsCount(final ShardId shardId, + ClusterState state, + IndexShardRoutingTable shardRoutingTable, + final ActionListener listener) { try { final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId()); @@ -353,8 +379,13 @@ public class SyncedFlushService implements IndexEventListener { return PreSyncedFlushResponse.UNKNOWN_NUM_DOCS; } - void sendSyncRequests(final String syncId, final List shards, ClusterState state, Map preSyncResponses, - final ShardId shardId, final int totalShards, final ActionListener listener) { + void sendSyncRequests(final String syncId, + final List shards, + ClusterState state, + Map preSyncResponses, + final ShardId shardId, + final int totalShards, + final ActionListener listener) { final CountDown countDown = new CountDown(shards.size()); final Map results = ConcurrentCollections.newConcurrentMap(); final int numDocsOnPrimary = numDocsOnPrimary(shards, preSyncResponses); @@ -368,13 +399,15 @@ public class SyncedFlushService implements IndexEventListener { } final PreSyncedFlushResponse preSyncedResponse = preSyncResponses.get(shard.currentNodeId()); if (preSyncedResponse == null) { - logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); + logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", + shardId, syncId, shard); results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush")); countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); continue; } - if (preSyncedResponse.numDocs != numDocsOnPrimary - && preSyncedResponse.numDocs != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS) { + if (preSyncedResponse.numDocs != numDocsOnPrimary && + preSyncedResponse.numDocs != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS && + numDocsOnPrimary != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS) { logger.warn("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]", shardId, syncId, shard, preSyncedResponse.numDocs, numDocsOnPrimary); results.put(shard, new ShardSyncedFlushResponse("out of sync replica; " + @@ -383,7 +416,8 @@ public class SyncedFlushService implements IndexEventListener { continue; } logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId); - transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId), + ShardSyncedFlushRequest syncedFlushRequest = new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId); + transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, syncedFlushRequest, new TransportResponseHandler() { @Override public ShardSyncedFlushResponse read(StreamInput in) throws IOException { @@ -402,7 +436,8 @@ public class SyncedFlushService implements IndexEventListener { @Override public void handleException(TransportException exp) { - logger.trace(() -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping", shardId, shard), exp); + logger.trace(() -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping", + shardId, shard), exp); results.put(shard, new ShardSyncedFlushResponse(exp.getMessage())); countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); } @@ -416,8 +451,13 @@ public class SyncedFlushService implements IndexEventListener { } - private void countDownAndSendResponseIfDone(String syncId, List shards, ShardId shardId, int totalShards, - ActionListener listener, CountDown countDown, Map results) { + private void countDownAndSendResponseIfDone(String syncId, + List shards, + ShardId shardId, + int totalShards, + ActionListener listener, + CountDown countDown, + Map results) { if (countDown.countDown()) { assert results.size() == shards.size(); listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results)); @@ -427,7 +467,10 @@ public class SyncedFlushService implements IndexEventListener { /** * send presync requests to all started copies of the given shard */ - void sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId, final ActionListener> listener) { + void sendPreSyncRequests(final List shards, + final ClusterState state, + final ShardId shardId, + final ActionListener> listener) { final CountDown countDown = new CountDown(shards.size()); final ConcurrentMap presyncResponses = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { @@ -440,7 +483,8 @@ public class SyncedFlushService implements IndexEventListener { } continue; } - transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new TransportResponseHandler() { + transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), + new TransportResponseHandler() { @Override public PreSyncedFlushResponse read(StreamInput in) throws IOException { PreSyncedFlushResponse response = new PreSyncedFlushResponse(); @@ -460,7 +504,8 @@ public class SyncedFlushService implements IndexEventListener { @Override public void handleException(TransportException exp) { - logger.trace(() -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping", shardId, shard), exp); + logger.trace(() -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping", + shardId, shard), exp); if (countDown.countDown()) { listener.onResponse(presyncResponses); } @@ -488,7 +533,8 @@ public class SyncedFlushService implements IndexEventListener { private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().id()); - logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId()); + logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", + request.shardId(), request.syncId(), request.expectedCommitId()); Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), request.expectedCommitId()); logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result); switch (result) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index a8abce4382e..69af8841f4b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -69,7 +69,8 @@ public class PeerRecoverySourceService implements IndexEventListener { this.transportService = transportService; this.indicesService = indicesService; this.recoverySettings = recoverySettings; - transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, new StartRecoveryTransportRequestHandler()); + transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, + new StartRecoveryTransportRequestHandler()); } @Override @@ -90,13 +91,16 @@ public class PeerRecoverySourceService implements IndexEventListener { throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary"); } - if (request.isPrimaryRelocation() && (routingEntry.relocating() == false || routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) { - logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}", request.shardId(), request.targetNode()); + if (request.isPrimaryRelocation() && (routingEntry.relocating() == false || + routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) { + logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}", + request.shardId(), request.targetNode()); throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]"); } RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard); - logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode()); + logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), + request.targetNode()); try { return handler.recoverToTarget(); } finally { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFailedException.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFailedException.java index 49e8c856fd4..085a7d392ca 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFailedException.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFailedException.java @@ -45,7 +45,11 @@ public class RecoveryFailedException extends ElasticsearchException { this(shardId, sourceNode, targetNode, null, cause); } - public RecoveryFailedException(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, @Nullable String extraInfo, Throwable cause) { + public RecoveryFailedException(ShardId shardId, + DiscoveryNode sourceNode, + DiscoveryNode targetNode, + @Nullable String extraInfo, + Throwable cause) { super(shardId + ": Recovery failed " + (sourceNode != null ? "from " + sourceNode + " into " : "on ") + targetNode + (extraInfo == null ? "" : " (" + extraInfo + ")"), cause); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index e9799b28ac9..31ecd4455b1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -111,7 +111,8 @@ public class RecoverySettings { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout); - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, this::setInternalActionLongTimeout); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, + this::setInternalActionLongTimeout); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 0d57d950662..9013cfa202d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -467,18 +467,22 @@ public class RecoveryState implements ToXContentFragment, Streamable { public synchronized void incrementRecoveredOperations() { recovered++; - assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; + assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + + "], recovered [" + recovered + "]"; } public synchronized void incrementRecoveredOperations(int ops) { recovered += ops; - assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; + assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + + "], recovered [" + recovered + "]"; } public synchronized void decrementRecoveredOperations(int ops) { recovered -= ops; - assert recovered >= 0 : "recovered operations must be non-negative. Because [" + recovered + "] after decrementing [" + ops + "]"; - assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; + assert recovered >= 0 : "recovered operations must be non-negative. Because [" + recovered + + "] after decrementing [" + ops + "]"; + assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + + total + "], recovered [" + recovered + "]"; } @@ -501,7 +505,8 @@ public class RecoveryState implements ToXContentFragment, Streamable { public synchronized void totalOperations(int total) { this.total = total; - assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; + assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + + "], recovered [" + recovered + "]"; } /** diff --git a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 4967c43675a..d9252bb7013 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -75,7 +75,8 @@ public class IndicesStore implements ClusterStateListener, Closeable { private static final Logger logger = LogManager.getLogger(IndicesStore.class); - // TODO this class can be foled into either IndicesService and partially into IndicesClusterStateService there is no need for a separate public service + // TODO this class can be foled into either IndicesService and partially into IndicesClusterStateService + // there is no need for a separate public service public static final Setting INDICES_STORE_DELETE_SHARD_TIMEOUT = Setting.positiveTimeSetting("indices.store.delete.shard.timeout", new TimeValue(30, TimeUnit.SECONDS), Property.NodeScope); @@ -100,7 +101,8 @@ public class IndicesStore implements ClusterStateListener, Closeable { this.clusterService = clusterService; this.transportService = transportService; this.threadPool = threadPool; - transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler()); + transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, + new ShardActiveRequestHandler()); this.deleteShardTimeout = INDICES_STORE_DELETE_SHARD_TIMEOUT.get(settings); // Doesn't make sense to delete shards on non-data nodes if (DiscoveryNode.isDataNode(settings)) { @@ -161,7 +163,8 @@ public class IndicesStore implements ClusterStateListener, Closeable { } else { indexSettings = indexService.getIndexSettings(); } - IndicesService.ShardDeletionCheckResult shardDeletionCheckResult = indicesService.canDeleteShardContent(shardId, indexSettings); + IndicesService.ShardDeletionCheckResult shardDeletionCheckResult = + indicesService.canDeleteShardContent(shardId, indexSettings); switch (shardDeletionCheckResult) { case FOLDER_FOUND_CAN_DELETE: deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable); @@ -214,7 +217,8 @@ public class IndicesStore implements ClusterStateListener, Closeable { for (ShardRouting shardRouting : indexShardRoutingTable) { assert shardRouting.started() : "expected started shard but was " + shardRouting; DiscoveryNode currentNode = state.nodes().get(shardRouting.currentNodeId()); - requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout))); + requests.add(new Tuple<>(currentNode, + new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout))); } ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state.getVersion(), @@ -273,20 +277,23 @@ public class IndicesStore implements ClusterStateListener, Closeable { private void allNodesResponded() { if (activeCopies.get() != expectedActiveCopies) { - logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get()); + logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies", + shardId, expectedActiveCopies, activeCopies.get()); return; } ClusterState latestClusterState = clusterService.state(); if (clusterStateVersion != latestClusterState.getVersion()) { - logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterStateVersion); + logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state " + + "before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterStateVersion); return; } clusterService.getClusterApplierService().runOnApplierThread("indices_store ([" + shardId + "] active fully on other nodes)", currentState -> { if (clusterStateVersion != currentState.getVersion()) { - logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterStateVersion); + logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before " + + "shard active api call [{}]", shardId, currentState.getVersion(), clusterStateVersion); return; } try { @@ -295,7 +302,8 @@ public class IndicesStore implements ClusterStateListener, Closeable { logger.debug(() -> new ParameterizedMessage("{} failed to delete unallocated shard, ignoring", shardId), ex); } }, - (source, e) -> logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", shardId), e) + (source, e) -> logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", + shardId), e) ); } @@ -314,10 +322,11 @@ public class IndicesStore implements ClusterStateListener, Closeable { // create observer here. we need to register it here because we need to capture the current cluster state // which will then be compared to the one that is applied when we call waitForNextChange(). if we create it // later we might miss an update and wait forever in case no new cluster state comes in. - // in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly. - // instead we wait for the cluster state changes because we know any shard state change will trigger or be - // triggered by a cluster state change. - ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger, threadPool.getThreadContext()); + // in general, using a cluster state observer here is a workaround for the fact that we cannot listen on + // shard state changes explicitly. instead we wait for the cluster state changes because we know any + // shard state change will trigger or be triggered by a cluster state change. + ClusterStateObserver observer = + new ClusterStateObserver(clusterService, request.timeout, logger, threadPool.getThreadContext()); // check if shard is active. if so, all is good boolean shardActive = shardActive(indexShard); if (shardActive) { @@ -344,16 +353,19 @@ public class IndicesStore implements ClusterStateListener, Closeable { try { channel.sendResponse(new ShardActiveResponse(shardActive, clusterService.localNode())); } catch (IOException e) { - logger.error(() -> new ParameterizedMessage("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", request.shardId), e); + logger.error(() -> new ParameterizedMessage("failed send response for shard active while trying to " + + "delete shard {} - shard will probably not be removed", request.shardId), e); } catch (EsRejectedExecutionException e) { - logger.error(() -> new ParameterizedMessage("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", request.shardId), e); + logger.error(() -> new ParameterizedMessage("failed send response for shard active while trying to " + + "delete shard {} - shard will probably not be removed", request.shardId), e); } } }, newState -> { - // the shard is not there in which case we want to send back a false (shard is not active), so the cluster state listener must be notified - // or the shard is active in which case we want to send back that the shard is active - // here we could also evaluate the cluster state and get the information from there. we - // don't do it because we would have to write another method for this that would have the same effect + // the shard is not there in which case we want to send back a false (shard is not active), + // so the cluster state listener must be notified or the shard is active in which case we want to + // send back that the shard is active here we could also evaluate the cluster state and get the + // information from there. we don't do it because we would have to write another method for this + // that would have the same effect IndexShard currentShard = getShard(request); return currentShard == null || shardActive(currentShard); }); @@ -371,7 +383,8 @@ public class IndicesStore implements ClusterStateListener, Closeable { private IndexShard getShard(ShardActiveRequest request) { ClusterName thisClusterName = clusterService.getClusterName(); if (!thisClusterName.equals(request.clusterName)) { - logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName); + logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", + request.clusterName, thisClusterName); return null; } ShardId shardId = request.shardId; diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 68c033f188d..5ebbdab3983 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -140,7 +140,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction { @@ -246,7 +252,10 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { Settings afterCloseSettings = Settings.EMPTY; @Override - public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState newState, @Nullable String reason) { + public void indexShardStateChanged(IndexShard indexShard, + @Nullable IndexShardState previousState, + IndexShardState newState, + @Nullable String reason) { List shardStates = this.shardStates.putIfAbsent(indexShard.shardId(), new CopyOnWriteArrayList<>(new IndexShardState[]{newState})); if (shardStates != null) { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java index 0a03343cc1d..ab9cf4bcef2 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java @@ -171,7 +171,8 @@ public class IndicesOptionsIntegrationIT extends ESIntegTestCase { verify(getMapping("test1").setIndicesOptions(options), true); verify(getSettings("test1").setIndicesOptions(options), true); - options = IndicesOptions.fromOptions(true, options.allowNoIndices(), options.expandWildcardsOpen(), options.expandWildcardsClosed(), options); + options = IndicesOptions.fromOptions(true, options.allowNoIndices(), options.expandWildcardsOpen(), + options.expandWildcardsClosed(), options); verify(search("test1").setIndicesOptions(options), false); verify(msearch(options, "test1"), false); verify(clearCache("test1").setIndicesOptions(options), false); @@ -227,7 +228,8 @@ public class IndicesOptionsIntegrationIT extends ESIntegTestCase { verify(getMapping("test1").setIndicesOptions(options), true); verify(getSettings("test1").setIndicesOptions(options), true); - options = IndicesOptions.fromOptions(true, options.allowNoIndices(), options.expandWildcardsOpen(), options.expandWildcardsClosed(), options); + options = IndicesOptions.fromOptions(true, options.allowNoIndices(), options.expandWildcardsOpen(), + options.expandWildcardsClosed(), options); verify(search("test1").setIndicesOptions(options), false); verify(msearch(options, "test1"), false); verify(clearCache("test1").setIndicesOptions(options), false); diff --git a/server/src/test/java/org/elasticsearch/indices/analyze/AnalyzeActionIT.java b/server/src/test/java/org/elasticsearch/indices/analyze/AnalyzeActionIT.java index 2094c20c890..4511c59c6b3 100644 --- a/server/src/test/java/org/elasticsearch/indices/analyze/AnalyzeActionIT.java +++ b/server/src/test/java/org/elasticsearch/indices/analyze/AnalyzeActionIT.java @@ -97,11 +97,13 @@ public class AnalyzeActionIT extends ESIntegTestCase { AnalyzeResponse analyzeResponse = client().admin().indices().prepareAnalyze("THIS IS A TEST").setAnalyzer("simple").get(); assertThat(analyzeResponse.getTokens().size(), equalTo(4)); - analyzeResponse = client().admin().indices().prepareAnalyze("THIS IS A TEST").setTokenizer("keyword").addTokenFilter("lowercase").get(); + analyzeResponse = client().admin().indices().prepareAnalyze("THIS IS A TEST").setTokenizer("keyword").addTokenFilter("lowercase") + .get(); assertThat(analyzeResponse.getTokens().size(), equalTo(1)); assertThat(analyzeResponse.getTokens().get(0).getTerm(), equalTo("this is a test")); - analyzeResponse = client().admin().indices().prepareAnalyze("THIS IS A TEST").setTokenizer("standard").addTokenFilter("lowercase").get(); + analyzeResponse = client().admin().indices().prepareAnalyze("THIS IS A TEST").setTokenizer("standard").addTokenFilter("lowercase") + .get(); assertThat(analyzeResponse.getTokens().size(), equalTo(4)); AnalyzeResponse.AnalyzeToken token = analyzeResponse.getTokens().get(0); assertThat(token.getTerm(), equalTo("this")); diff --git a/server/src/test/java/org/elasticsearch/indices/exists/types/TypesExistsIT.java b/server/src/test/java/org/elasticsearch/indices/exists/types/TypesExistsIT.java index 301c7a17f90..294df496032 100644 --- a/server/src/test/java/org/elasticsearch/indices/exists/types/TypesExistsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/exists/types/TypesExistsIT.java @@ -89,7 +89,8 @@ public class TypesExistsIT extends ESIntegTestCase { for (String block : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY)) { try { enableIndexBlock("ro", block); - assertThat(client().admin().indices().prepareTypesExists("ro").setTypes("type1").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().admin().indices().prepareTypesExists("ro").setTypes("type1").execute().actionGet().isExists(), + equalTo(true)); } finally { disableIndexBlock("ro", block); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index ea23ae6308e..a42993f5e2d 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -80,7 +80,8 @@ public class FlushIT extends ESIntegTestCase { public void onResponse(FlushResponse flushResponse) { try { // don't use assertAllSuccessful it uses a randomized context that belongs to a different thread - assertThat("Unexpected ShardFailures: " + Arrays.toString(flushResponse.getShardFailures()), flushResponse.getFailedShards(), equalTo(0)); + assertThat("Unexpected ShardFailures: " + Arrays.toString(flushResponse.getShardFailures()), + flushResponse.getFailedShards(), equalTo(0)); latch.countDown(); } catch (Exception ex) { onFailure(ex); @@ -138,7 +139,8 @@ public class FlushIT extends ESIntegTestCase { ShardRouting shardRouting = clusterState.getRoutingTable().index("test").shard(0).iterator().next(); String currentNodeName = clusterState.nodes().resolveNode(shardRouting.currentNodeId()).getName(); assertFalse(currentNodeName.equals(newNodeName)); - internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, currentNodeName, newNodeName)).get(); + internalCluster().client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand("test", 0, currentNodeName, newNodeName)).get(); client().admin().cluster().prepareHealth() .setWaitForNoRelocatingShards(true) @@ -148,13 +150,15 @@ public class FlushIT extends ESIntegTestCase { assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); } - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build()).get(); + client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build()).get(); ensureGreen("test"); indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); for (ShardStats shardStats : indexStats.getShards()) { assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); } - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1).build()).get(); + client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1).build()).get(); ensureGreen("test"); indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); for (ShardStats shardStats : indexStats.getShards()) { @@ -168,7 +172,10 @@ public class FlushIT extends ESIntegTestCase { createIndex("test"); client().admin().indices().prepareUpdateSettings("test").setSettings( - Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1)) + Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) + .put("index.refresh_interval", -1) + .put("index.number_of_replicas", internalCluster().numDataNodes() - 1)) .get(); ensureGreen(); final AtomicBoolean stop = new AtomicBoolean(false); @@ -209,13 +216,16 @@ public class FlushIT extends ESIntegTestCase { for (final ShardStats shardStats : shardsStats) { for (final ShardsSyncedFlushResult shardResult : syncedFlushResults) { if (shardStats.getShardRouting().getId() == shardResult.shardId().getId()) { - for (Map.Entry singleResponse : shardResult.shardResponses().entrySet()) { + for (Map.Entry singleResponse : + shardResult.shardResponses().entrySet()) { if (singleResponse.getKey().currentNodeId().equals(shardStats.getShardRouting().currentNodeId())) { if (singleResponse.getValue().success()) { - logger.info("{} sync flushed on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId()); + logger.info("{} sync flushed on node {}", singleResponse.getKey().shardId(), + singleResponse.getKey().currentNodeId()); assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); } else { - logger.info("{} sync flush failed for on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId()); + logger.info("{} sync flush failed for on node {}", singleResponse.getKey().shardId(), + singleResponse.getKey().currentNodeId()); assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); } } @@ -227,12 +237,15 @@ public class FlushIT extends ESIntegTestCase { public void testUnallocatedShardsDoesNotHang() throws InterruptedException { // create an index but disallow allocation - prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); + prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder() + .put("index.routing.allocation.include._name", "nonexistent")).get(); // this should not hang but instead immediately return with empty result set - List shardsResult = client().admin().indices().prepareSyncedFlush("test").get().getShardsResultPerIndex().get("test"); + List shardsResult = client().admin().indices().prepareSyncedFlush("test").get() + .getShardsResultPerIndex().get("test"); // just to make sure the test actually tests the right thing - int numShards = client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1); + int numShards = client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test") + .getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1); assertThat(shardsResult.size(), equalTo(numShards)); assertThat(shardsResult.get(0).failureReason(), equalTo("no active shards")); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index 83411ad2bc2..b9e0bd13f35 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -52,7 +52,8 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); + Map preSyncedResponses = + SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, preSyncedResponses.size()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); String syncId = UUIDs.randomBase64UUID(); @@ -69,8 +70,9 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success()); assertEquals("pending operations", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason()); - SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't sync-flush with the old one - listener = new SyncedFlushUtil.LatchedListener(); + // pull another commit and make sure we can't sync-flush with the old one + SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); + listener = new SyncedFlushUtil.LatchedListener<>(); flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); @@ -92,7 +94,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); - SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); assertNull(listener.error); @@ -171,14 +173,15 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); + Map preSyncedResponses = + SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, preSyncedResponses.size()); if (randomBoolean()) { client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); } client().admin().indices().prepareFlush("test").setForce(true).get(); String syncId = UUIDs.randomBase64UUID(); - final SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); + final SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); @@ -204,11 +207,12 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); + Map preSyncedResponses = + SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, preSyncedResponses.size()); preSyncedResponses.clear(); // wipe it... String syncId = UUIDs.randomBase64UUID(); - SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java index 8a8d57295a5..ffb494570a5 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java @@ -91,7 +91,10 @@ public class SyncedFlushUtil { /** * Blocking version of {@link SyncedFlushService#sendPreSyncRequests(List, ClusterState, ShardId, ActionListener)} */ - public static Map sendPreSyncRequests(SyncedFlushService service, List activeShards, ClusterState state, ShardId shardId) { + public static Map sendPreSyncRequests(SyncedFlushService service, + List activeShards, + ClusterState state, + ShardId shardId) { LatchedListener> listener = new LatchedListener<>(); service.sendPreSyncRequests(activeShards, state, shardId, listener); try { diff --git a/server/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateIT.java b/server/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateIT.java index 5d0a4f25b5c..0385e4b1046 100644 --- a/server/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateIT.java +++ b/server/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateIT.java @@ -61,7 +61,8 @@ public class ConcurrentDynamicTemplateIT extends ESIntegTestCase { for (int j = 0; j < numDocs; j++) { Map source = new HashMap<>(); source.put(fieldName, "test-user"); - client().prepareIndex("test", mappingType, Integer.toString(currentID++)).setSource(source).execute(new ActionListener() { + client().prepareIndex("test", mappingType, Integer.toString(currentID++)).setSource(source).execute( + new ActionListener() { @Override public void onResponse(IndexResponse response) { latch.countDown(); diff --git a/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsIT.java b/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsIT.java index 788777ade7d..0e2e230053c 100644 --- a/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsIT.java @@ -100,7 +100,8 @@ public class SimpleGetFieldMappingsIT extends ESIntegTestCase { // Get mappings by full name - GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings("indexa").setTypes("typeA").setFields("field1", "obj.subfield").get(); + GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings("indexa").setTypes("typeA") + .setFields("field1", "obj.subfield").get(); assertThat(response.fieldMappings("indexa", "typeA", "field1").fullName(), equalTo("field1")); assertThat(response.fieldMappings("indexa", "typeA", "field1").sourceAsMap(), hasKey("field1")); assertThat(response.fieldMappings("indexa", "typeA", "obj.subfield").fullName(), equalTo("obj.subfield")); @@ -108,7 +109,8 @@ public class SimpleGetFieldMappingsIT extends ESIntegTestCase { assertThat(response.fieldMappings("indexb", "typeB", "field1"), nullValue()); // Get mappings by name - response = client().admin().indices().prepareGetFieldMappings("indexa").setTypes("typeA").setFields("field1", "obj.subfield").get(); + response = client().admin().indices().prepareGetFieldMappings("indexa").setTypes("typeA").setFields("field1", "obj.subfield") + .get(); assertThat(response.fieldMappings("indexa", "typeA", "field1").fullName(), equalTo("field1")); assertThat(response.fieldMappings("indexa", "typeA", "field1").sourceAsMap(), hasKey("field1")); assertThat(response.fieldMappings("indexa", "typeA", "obj.subfield").fullName(), equalTo("obj.subfield")); @@ -148,13 +150,19 @@ public class SimpleGetFieldMappingsIT extends ESIntegTestCase { client().prepareIndex("test", "type", "1").setSource("num", 1).get(); - GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings().setFields("num", "field1", "obj.subfield").includeDefaults(true).get(); + GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings() + .setFields("num", "field1", "obj.subfield").includeDefaults(true).get(); - assertThat((Map) response.fieldMappings("test", "type", "num").sourceAsMap().get("num"), hasEntry("index", Boolean.TRUE)); - assertThat((Map) response.fieldMappings("test", "type", "num").sourceAsMap().get("num"), hasEntry("type", (Object) "long")); - assertThat((Map) response.fieldMappings("test", "type", "field1").sourceAsMap().get("field1"), hasEntry("index", Boolean.TRUE)); - assertThat((Map) response.fieldMappings("test", "type", "field1").sourceAsMap().get("field1"), hasEntry("type", (Object) "text")); - assertThat((Map) response.fieldMappings("test", "type", "obj.subfield").sourceAsMap().get("subfield"), hasEntry("type", (Object) "keyword")); + assertThat((Map) response.fieldMappings("test", "type", "num").sourceAsMap().get("num"), + hasEntry("index", Boolean.TRUE)); + assertThat((Map) response.fieldMappings("test", "type", "num").sourceAsMap().get("num"), + hasEntry("type", "long")); + assertThat((Map) response.fieldMappings("test", "type", "field1").sourceAsMap().get("field1"), + hasEntry("index", Boolean.TRUE)); + assertThat((Map) response.fieldMappings("test", "type", "field1").sourceAsMap().get("field1"), + hasEntry("type", "text")); + assertThat((Map) response.fieldMappings("test", "type", "obj.subfield").sourceAsMap().get("subfield"), + hasEntry("type", "keyword")); } @SuppressWarnings("unchecked") @@ -179,7 +187,8 @@ public class SimpleGetFieldMappingsIT extends ESIntegTestCase { assertAcked(prepareCreate("index").addMapping("type", getMappingForType("type"))); Map params = new HashMap<>(); params.put("pretty", "true"); - GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings("index").setTypes("type").setFields("field1", "obj.subfield").get(); + GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings("index") + .setTypes("type").setFields("field1", "obj.subfield").get(); XContentBuilder responseBuilder = XContentFactory.jsonBuilder().prettyPrint(); response.toXContent(responseBuilder, new ToXContent.MapParams(params)); String responseStrings = Strings.toString(responseBuilder); @@ -191,7 +200,8 @@ public class SimpleGetFieldMappingsIT extends ESIntegTestCase { params.put("pretty", "false"); - response = client().admin().indices().prepareGetFieldMappings("index").setTypes("type").setFields("field1", "obj.subfield").get(); + response = client().admin().indices().prepareGetFieldMappings("index") + .setTypes("type").setFields("field1", "obj.subfield").get(); responseBuilder = XContentFactory.jsonBuilder().prettyPrint().lfAtEnd(); response.toXContent(responseBuilder, new ToXContent.MapParams(params)); responseStrings = Strings.toString(responseBuilder); diff --git a/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetMappingsIT.java b/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetMappingsIT.java index 545478dda7a..c52cd313a8e 100644 --- a/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetMappingsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/mapping/SimpleGetMappingsIT.java @@ -70,7 +70,8 @@ public class SimpleGetMappingsIT extends ESIntegTestCase { .addMapping("typeA", getMappingForType("typeA")) .execute().actionGet(); - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); // Get all mappings diff --git a/server/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingIntegrationIT.java b/server/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingIntegrationIT.java index 2e54490ed78..c6dd9ec7c8f 100644 --- a/server/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingIntegrationIT.java @@ -230,7 +230,8 @@ public class UpdateMappingIntegrationIT extends ESIntegTestCase { GetMappingsResponse getMappingResponse = client2.admin().indices().prepareGetMappings(indexName).get(); ImmutableOpenMap mappings = getMappingResponse.getMappings().get(indexName); assertThat(mappings.containsKey(typeName), equalTo(true)); - assertThat(((Map) mappings.get(typeName).getSourceAsMap().get("properties")).keySet(), Matchers.hasItem(fieldName)); + assertThat(((Map) mappings.get(typeName).getSourceAsMap().get("properties")).keySet(), + Matchers.hasItem(fieldName)); } } catch (Exception e) { threadException.set(e); diff --git a/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java b/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java index 1c8eaf71f8a..d01e32d78ac 100644 --- a/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java +++ b/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java @@ -66,7 +66,8 @@ public class CircuitBreakerUnitTests extends ESTestCase { } public void testRegisterCustomBreaker() throws Exception { - CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); String customName = "custom"; BreakerSettings settings = new BreakerSettings(customName, 20, 1.0); service.registerBreaker(settings); diff --git a/server/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java b/server/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java index 31ee22200a2..f379b7ee522 100644 --- a/server/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java +++ b/server/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java @@ -146,12 +146,15 @@ public class RandomExceptionCircuitBreakerIT extends ESIntegTestCase { for (int i = 0; i < numDocs; i++) { try { client().prepareIndex("test", "type", "" + i) - .setTimeout(TimeValue.timeValueSeconds(1)).setSource("test-str", randomUnicodeOfLengthBetween(5, 25), "test-num", i).get(); + .setTimeout(TimeValue.timeValueSeconds(1)) + .setSource("test-str", randomUnicodeOfLengthBetween(5, 25), "test-num", i) + .get(); } catch (ElasticsearchException ex) { } } logger.info("Start Refresh"); - RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().get(); // don't assert on failures here + // don't assert on failures here + RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().get(); final boolean refreshFailed = refreshResponse.getShardFailures().length != 0 || refreshResponse.getFailedShards() != 0; logger.info("Refresh failed: [{}] numShardsFailed: [{}], shardFailuresLength: [{}], successfulShards: [{}], totalShards: [{}] ", refreshFailed, refreshResponse.getFailedShards(), refreshResponse.getShardFailures().length, @@ -188,7 +191,8 @@ public class RandomExceptionCircuitBreakerIT extends ESIntegTestCase { // Since .cleanUp() is no longer called on cache clear, we need to call it on each node manually for (String node : internalCluster().getNodeNames()) { - final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesService.class, node).getIndicesFieldDataCache(); + final IndicesFieldDataCache fdCache = + internalCluster().getInstance(IndicesService.class, node).getIndicesFieldDataCache(); // Clean up the cache, ensuring that entries' listeners have been called fdCache.getCache().refresh(); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java index 7de5862b855..4e43f95d844 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -44,7 +44,8 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase { private static final int RELOCATION_COUNT = 15; - @TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.index.shard:TRACE,org.elasticsearch.cluster.service:TRACE") + @TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.index.shard:TRACE," + + "org.elasticsearch.cluster.service:TRACE") public void testPrimaryRelocationWhileIndexing() throws Exception { internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3)); client().admin().indices().prepareCreate("test") @@ -71,7 +72,8 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase { ClusterState initialState = client().admin().cluster().prepareState().get().getState(); DiscoveryNode[] dataNodes = initialState.getNodes().getDataNodes().values().toArray(DiscoveryNode.class); - DiscoveryNode relocationSource = initialState.getNodes().getDataNodes().get(initialState.getRoutingTable().shardRoutingTable("test", 0).primaryShard().currentNodeId()); + DiscoveryNode relocationSource = initialState.getNodes().getDataNodes().get(initialState.getRoutingTable() + .shardRoutingTable("test", 0).primaryShard().currentNodeId()); for (int i = 0; i < RELOCATION_COUNT; i++) { DiscoveryNode relocationTarget = randomFrom(dataNodes); while (relocationTarget.equals(relocationSource)) { @@ -81,11 +83,13 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase { client().admin().cluster().prepareReroute() .add(new MoveAllocationCommand("test", 0, relocationSource.getId(), relocationTarget.getId())) .execute().actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> [iteration {}] relocation complete", i); relocationSource = relocationTarget; - if (indexingThread.isAlive() == false) { // indexing process aborted early, no need for more relocations as test has already failed + // indexing process aborted early, no need for more relocations as test has already failed + if (indexingThread.isAlive() == false) { break; } if (i > 0 && i % 5 == 0) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index c24ba2f8612..e3c9aaafc60 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -257,7 +257,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { final String nodeA = internalCluster().startNode(); logger.info("--> create index on node: {}", nodeA); - ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats().getStore().size(); + ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) + .getShards()[0].getStats().getStore().size(); logger.info("--> start node B"); final String nodeB = internalCluster().startNode(); @@ -292,14 +293,16 @@ public class IndexRecoveryIT extends ESIntegTestCase { List nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates); assertThat(nodeBRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, Stage.DONE, null, nodeA); + assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, + Stage.DONE, null, nodeA); validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, true, nodeA, nodeB); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); logger.info("--> request node recovery stats"); - NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); + NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); long nodeAThrottling = Long.MAX_VALUE; long nodeBThrottling = Long.MAX_VALUE; for (NodeStats nodeStats : statsResponse.getNodes()) { @@ -320,15 +323,18 @@ public class IndexRecoveryIT extends ESIntegTestCase { final long finalNodeAThrottling = nodeAThrottling; final long finalNodeBThrottling = nodeBThrottling; assertBusy(() -> { - NodesStatsResponse statsResponse1 = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); + NodesStatsResponse statsResponse1 = client().admin().cluster().prepareNodesStats().clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); assertThat(statsResponse1.getNodes(), hasSize(2)); for (NodeStats nodeStats : statsResponse1.getNodes()) { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); if (nodeStats.getNode().getName().equals(nodeA)) { - assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling)); + assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), + greaterThan(finalNodeAThrottling)); } if (nodeStats.getNode().getName().equals(nodeB)) { - assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling)); + assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), + greaterThan(finalNodeBThrottling)); } } }); @@ -466,7 +472,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(REPO_NAME, SNAP_NAME) .setWaitForCompletion(true).setIndices(INDEX_NAME).get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(client().admin().cluster().prepareGetSnapshots(REPO_NAME).setSnapshots(SNAP_NAME).get() .getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); @@ -552,8 +559,10 @@ public class IndexRecoveryIT extends ESIntegTestCase { // start a master node internalCluster().startNode(nodeSettings); - final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); - final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); + final String blueNodeName = internalCluster() + .startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + final String redNodeName = internalCluster() + .startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get(); assertThat(response.isTimedOut(), is(false)); @@ -596,14 +605,18 @@ public class IndexRecoveryIT extends ESIntegTestCase { final boolean dropRequests = randomBoolean(); logger.info("--> will {} between blue & red on [{}]", dropRequests ? "drop requests" : "break connection", recoveryActionToBlock); - MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); - MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); + MockTransportService blueMockTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); + MockTransportService redMockTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); TransportService redTransportService = internalCluster().getInstance(TransportService.class, redNodeName); TransportService blueTransportService = internalCluster().getInstance(TransportService.class, blueNodeName); final CountDownLatch requestBlocked = new CountDownLatch(1); - blueMockTransportService.addSendBehavior(redTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestBlocked)); - redMockTransportService.addSendBehavior(blueTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestBlocked)); + blueMockTransportService.addSendBehavior(redTransportService, + new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestBlocked)); + redMockTransportService.addSendBehavior(blueTransportService, + new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestBlocked)); logger.info("--> starting recovery from blue to red"); client().admin().indices().prepareUpdateSettings(indexName).setSettings( @@ -659,14 +672,17 @@ public class IndexRecoveryIT extends ESIntegTestCase { boolean primaryRelocation = randomBoolean(); final String indexName = "test"; final Settings nodeSettings = Settings.builder() - .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), TimeValue.timeValueMillis(randomIntBetween(0, 100))) + .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), + TimeValue.timeValueMillis(randomIntBetween(0, 100))) .build(); TimeValue disconnectAfterDelay = TimeValue.timeValueMillis(randomIntBetween(0, 100)); // start a master node String masterNodeName = internalCluster().startMasterOnlyNode(nodeSettings); - final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); - final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); + final String blueNodeName = internalCluster() + .startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + final String redNodeName = internalCluster() + .startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); client().admin().indices().prepareCreate(indexName) .setSettings( @@ -685,9 +701,12 @@ public class IndexRecoveryIT extends ESIntegTestCase { ensureSearchable(indexName); assertHitCount(client().prepareSearch(indexName).get(), numDocs); - MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNodeName); - MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); - MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); + MockTransportService masterTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, masterNodeName); + MockTransportService blueMockTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); + MockTransportService redMockTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); redMockTransportService.addSendBehavior(blueMockTransportService, new StubbableTransport.SendRequestBehavior() { private final AtomicInteger count = new AtomicInteger(); @@ -710,7 +729,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { } catch (InterruptedException e) { throw new RuntimeException(e); } - throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulation disconnect after successfully sending " + action + " request"); + throw new ConnectTransportException(connection.getNode(), + "DISCONNECT: simulation disconnect after successfully sending " + action + " request"); } else { connection.sendRequest(requestId, action, request, options); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 7791e51445a..af4dc59ca1a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -96,7 +96,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RecoverySourceHandlerTests extends ESTestCase { - private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()); + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", + Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()); private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1); private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java index 0d1a5928fb8..ed1ee770852 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java @@ -84,7 +84,8 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase { strings = Sets.newHashSet(status.store().directory().listAll()); assertTrue(strings.toString(), strings.contains("foo.bar")); assertFalse(strings.toString(), strings.contains(expectedFile)); - // we must fail the recovery because marking it as done will try to move the shard to POST_RECOVERY, which will fail because it's started + // we must fail the recovery because marking it as done will try to move the shard to POST_RECOVERY, + // which will fail because it's started status.fail(new RecoveryFailedException(status.state(), "end of test. OK.", null), false); } } diff --git a/server/src/test/java/org/elasticsearch/indices/settings/UpdateNumberOfReplicasIT.java b/server/src/test/java/org/elasticsearch/indices/settings/UpdateNumberOfReplicasIT.java index ec5d0cab96b..28d14ab36bd 100644 --- a/server/src/test/java/org/elasticsearch/indices/settings/UpdateNumberOfReplicasIT.java +++ b/server/src/test/java/org/elasticsearch/indices/settings/UpdateNumberOfReplicasIT.java @@ -48,7 +48,8 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { logger.info("Creating index test"); assertAcked(prepareCreate("test", 2)); logger.info("Running Cluster Health"); - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); NumShards numShards = getNumShards("test"); @@ -75,9 +76,11 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { final long settingsVersion = client().admin().cluster().prepareState().get().getState().metaData().index("test").getSettingsVersion(); logger.info("Increasing the number of replicas from 1 to 2"); - assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 2)).execute().actionGet()); + assertAcked(client().admin().indices().prepareUpdateSettings("test") + .setSettings(Settings.builder().put("index.number_of_replicas", 2)).execute().actionGet()); logger.info("Running Cluster Health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForActiveShards(numShards.numPrimaries * 2).execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus() + .setWaitForActiveShards(numShards.numPrimaries * 2).execute().actionGet(); logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); @@ -97,7 +100,8 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { client().admin().cluster().prepareState().get().getState().metaData().index("test").getSettingsVersion(); logger.info("Running Cluster Health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes(">=3").execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus() + .setWaitForNoRelocatingShards(true).setWaitForNodes(">=3").execute().actionGet(); logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -112,10 +116,12 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { } logger.info("Decreasing number of replicas from 2 to 0"); - assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 0)).get()); + assertAcked(client().admin().indices().prepareUpdateSettings("test"). + setSettings(Settings.builder().put("index.number_of_replicas", 0)).get()); logger.info("Running Cluster Health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes(">=3").execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes(">=3").execute().actionGet(); logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -141,7 +147,11 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { NumShards numShards = getNumShards("test"); logger.info("--> running cluster health"); - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForActiveShards(numShards.numPrimaries * 2).execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForActiveShards(numShards.numPrimaries * 2) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -156,7 +166,12 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { allowNodes("test", 3); logger.info("--> running cluster health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForActiveShards(numShards.numPrimaries * 3).setWaitForNodes(">=3").execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForActiveShards(numShards.numPrimaries * 3) + .setWaitForNodes(">=3") + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -173,7 +188,12 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { allowNodes("test", 2); logger.info("--> running cluster health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForActiveShards(numShards.numPrimaries * 2).setWaitForNodes(">=2").execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForActiveShards(numShards.numPrimaries * 2) + .setWaitForNodes(">=2") + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -190,7 +210,12 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { allowNodes("test", 1); logger.info("--> running cluster health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes(">=1").setWaitForActiveShards(numShards.numPrimaries).execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(">=1") + .setWaitForActiveShards(numShards.numPrimaries) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -211,7 +236,11 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { NumShards numShards = getNumShards("test"); logger.info("--> running cluster health"); - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForActiveShards(numShards.numPrimaries * 2).execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForActiveShards(numShards.numPrimaries * 2) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -225,7 +254,11 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { allowNodes("test", 3); logger.info("--> running cluster health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForActiveShards(numShards.numPrimaries * 3).execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForActiveShards(numShards.numPrimaries * 3) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -242,7 +275,11 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { allowNodes("test", 2); logger.info("--> running cluster health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes(">=2").setWaitForActiveShards(numShards.numPrimaries * 2).execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(">=2") + .setWaitForActiveShards(numShards.numPrimaries * 2) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -259,7 +296,12 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { allowNodes("test", 1); logger.info("--> running cluster health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForNodes(">=1").setWaitForActiveShards(numShards.numPrimaries).execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForYellowStatus() + .setWaitForNodes(">=1") + .setWaitForActiveShards(numShards.numPrimaries) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); @@ -275,7 +317,11 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { NumShards numShards = getNumShards("test"); logger.info("--> running cluster health"); - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForActiveShards(numShards.numPrimaries * 3).execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForActiveShards(numShards.numPrimaries * 3) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -290,10 +336,16 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase { final long settingsVersion = client().admin().cluster().prepareState().get().getState().metaData().index("test").getSettingsVersion(); logger.info("--> update the auto expand replicas to 0-3"); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("auto_expand_replicas", "0-3")).execute().actionGet(); + client().admin().indices().prepareUpdateSettings("test") + .setSettings(Settings.builder().put("auto_expand_replicas", "0-3")) + .execute().actionGet(); logger.info("--> running cluster health"); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForActiveShards(numShards.numPrimaries * 4).execute().actionGet(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForActiveShards(numShards.numPrimaries * 4) + .execute().actionGet(); logger.info("--> done cluster health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); diff --git a/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java index f37dde7ec09..05b2ae1b9cf 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java @@ -264,7 +264,8 @@ public class OpenCloseIndexIT extends ESIntegTestCase { ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); assertThat(healthResponse.isTimedOut(), equalTo(false)); - AcknowledgedResponse aliasesResponse = client.admin().indices().prepareAliases().addAlias("test1", "test1-alias").execute().actionGet(); + AcknowledgedResponse aliasesResponse = client.admin().indices().prepareAliases().addAlias("test1", "test1-alias") + .execute().actionGet(); assertThat(aliasesResponse.isAcknowledged(), equalTo(true)); AcknowledgedResponse closeIndexResponse = client.admin().indices().prepareClose("test1-alias").execute().actionGet(); @@ -283,9 +284,11 @@ public class OpenCloseIndexIT extends ESIntegTestCase { ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); assertThat(healthResponse.isTimedOut(), equalTo(false)); - AcknowledgedResponse aliasesResponse1 = client.admin().indices().prepareAliases().addAlias("test1", "test-alias").execute().actionGet(); + AcknowledgedResponse aliasesResponse1 = client.admin().indices().prepareAliases().addAlias("test1", "test-alias") + .execute().actionGet(); assertThat(aliasesResponse1.isAcknowledged(), equalTo(true)); - AcknowledgedResponse aliasesResponse2 = client.admin().indices().prepareAliases().addAlias("test2", "test-alias").execute().actionGet(); + AcknowledgedResponse aliasesResponse2 = client.admin().indices().prepareAliases().addAlias("test2", "test-alias") + .execute().actionGet(); assertThat(aliasesResponse2.isAcknowledged(), equalTo(true)); AcknowledgedResponse closeIndexResponse = client.admin().indices().prepareClose("test-alias").execute().actionGet(); diff --git a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index df22e81aa6a..d983262581f 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -107,7 +107,8 @@ public class RareClusterStateIT extends ESIntegTestCase { buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT))); // open index - final IndexMetaData indexMetaData = IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN).build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(currentState.metaData() + .index(index)).state(IndexMetaData.State.OPEN).build(); builder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true)); builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index)); @@ -235,7 +236,8 @@ public class RareClusterStateIT extends ESIntegTestCase { // Add a new mapping... final AtomicReference putMappingResponse = new AtomicReference<>(); - client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(new ActionListener() { + client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute( + new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { putMappingResponse.set(response); @@ -248,7 +250,8 @@ public class RareClusterStateIT extends ESIntegTestCase { }); // ...and wait for mappings to be available on master assertBusy(() -> { - ImmutableOpenMap indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index"); + ImmutableOpenMap indexMappings = client().admin().indices() + .prepareGetMappings("index").get().getMappings().get("index"); assertNotNull(indexMappings); MappingMetaData typeMappings = indexMappings.get("type"); assertNotNull(typeMappings); @@ -349,7 +352,8 @@ public class RareClusterStateIT extends ESIntegTestCase { internalCluster().setDisruptionScheme(disruption); disruption.startDisrupting(); final AtomicReference putMappingResponse = new AtomicReference<>(); - client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(new ActionListener() { + client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute( + new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { putMappingResponse.set(response); diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 44ca66d571d..339a87cb4bc 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -136,8 +136,10 @@ public class IndexStatsIT extends ESIntegTestCase { client().admin().indices().prepareRefresh().execute().actionGet(); NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); - IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true) + .execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L)); // sort to load it to field data... @@ -145,7 +147,8 @@ public class IndexStatsIT extends ESIntegTestCase { client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); @@ -154,19 +157,32 @@ public class IndexStatsIT extends ESIntegTestCase { client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet(); // now check the per field stats - nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet(); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), greaterThan(0L)); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes())); + nodesStats = client().admin().cluster().prepareNodesStats("data:true") + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")) + .execute().actionGet(); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), + lessThan(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes())); - indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).setFieldDataFields("*").execute().actionGet(); + indicesStats = client().admin().indices().prepareStats("test") + .clear() + .setFieldData(true) + .setFieldDataFields("*") + .execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), greaterThan(0L)); - assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes())); + assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), + lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes())); client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet(); nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L)); @@ -184,8 +200,10 @@ public class IndexStatsIT extends ESIntegTestCase { NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true) .execute().actionGet(); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); - assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test") .clear().setFieldData(true).setQueryCache(true) @@ -205,8 +223,10 @@ public class IndexStatsIT extends ESIntegTestCase { nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true) .execute().actionGet(); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); - assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L)); indicesStats = client().admin().indices().prepareStats("test") .clear().setFieldData(true).setQueryCache(true) @@ -218,8 +238,10 @@ public class IndexStatsIT extends ESIntegTestCase { Thread.sleep(100); // Make sure the filter cache entries have been removed... nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true) .execute().actionGet(); - assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); - assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); indicesStats = client().admin().indices().prepareStats("test") .clear().setFieldData(true).setQueryCache(true) @@ -263,15 +285,22 @@ public class IndexStatsIT extends ESIntegTestCase { } } - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(), equalTo(0L)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(), equalTo(0L)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), equalTo(0L)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getHitCount(), equalTo(0L)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMissCount(), equalTo(0L)); for (int i = 0; i < 10; i++) { - assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), equalTo((long) numDocs)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), + equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache(). + getMemorySizeInBytes(), greaterThan(0L)); } - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(), greaterThan(0L)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(), greaterThan(0L)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache(). + getHitCount(), greaterThan(0L)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache(). + getMissCount(), greaterThan(0L)); // index the data again... IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; @@ -284,34 +313,49 @@ public class IndexStatsIT extends ESIntegTestCase { } indexRandom(true, builders); refresh(); - assertBusy(() -> assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L))); + assertBusy(() -> { + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), equalTo(0L)); + }); for (int i = 0; i < 10; i++) { - assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), equalTo((long) numDocs)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), + equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), greaterThan(0L)); } client().admin().indices().prepareClearCache().setRequestCache(true).get(); // clean the cache - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), equalTo(0L)); // test explicit request parameter - assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).setRequestCache(false).get().getHits().getTotalHits(), equalTo((long) numDocs)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L)); + assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).setRequestCache(false).get() + .getHits().getTotalHits(), equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), equalTo(0L)); - assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).setRequestCache(true).get().getHits().getTotalHits(), equalTo((long) numDocs)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).setRequestCache(true).get() + .getHits().getTotalHits(), equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), greaterThan(0L)); // set the index level setting to false, and see that the reverse works client().admin().indices().prepareClearCache().setRequestCache(true).get(); // clean the cache - assertAcked(client().admin().indices().prepareUpdateSettings("idx").setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false))); + assertAcked(client().admin().indices().prepareUpdateSettings("idx") + .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false))); - assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), equalTo((long) numDocs)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L)); + assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get() + .getHits().getTotalHits(), equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), equalTo(0L)); - assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).setRequestCache(true).get().getHits().getTotalHits(), equalTo((long) numDocs)); - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).setRequestCache(true).get() + .getHits().getTotalHits(), equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache() + .getMemorySizeInBytes(), greaterThan(0L)); } public void testNonThrottleStats() throws Exception { @@ -730,10 +774,13 @@ public class IndexStatsIT extends ESIntegTestCase { assertAcked(prepareCreate("test1") .addMapping( "_doc", - "{ \"properties\": { \"bar\": { \"type\": \"text\", \"fields\": { \"completion\": { \"type\": \"completion\" }}},\"baz\": { \"type\": \"text\", \"fields\": { \"completion\": { \"type\": \"completion\" }}}}}", XContentType.JSON)); + "{ \"properties\": { \"bar\": { \"type\": \"text\", \"fields\": { \"completion\": { \"type\": \"completion\" }}}" + + ",\"baz\": { \"type\": \"text\", \"fields\": { \"completion\": { \"type\": \"completion\" }}}}}", + XContentType.JSON)); ensureGreen(); - client().prepareIndex("test1", "_doc", Integer.toString(1)).setSource("{\"bar\":\"bar\",\"baz\":\"baz\"}", XContentType.JSON).get(); + client().prepareIndex("test1", "_doc", Integer.toString(1)).setSource("{\"bar\":\"bar\",\"baz\":\"baz\"}" + , XContentType.JSON).get(); refresh(); IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); @@ -979,7 +1026,8 @@ public class IndexStatsIT extends ESIntegTestCase { // the query cache has an optimization that disables it automatically if there is contention, // so we run it in an assertBusy block which should eventually succeed assertBusy(() -> { - assertSearchResponse(client().prepareSearch("index").setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.matchQuery("foo", "baz"))).get()); + assertSearchResponse(client().prepareSearch("index") + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.matchQuery("foo", "baz"))).get()); IndicesStatsResponse stats = client().admin().indices().prepareStats("index").setQueryCache(true).get(); assertCumulativeQueryCacheStats(stats); assertThat(stats.getTotal().queryCache.getHitCount(), equalTo(0L)); @@ -989,7 +1037,8 @@ public class IndexStatsIT extends ESIntegTestCase { }); assertBusy(() -> { - assertSearchResponse(client().prepareSearch("index").setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.matchQuery("foo", "baz"))).get()); + assertSearchResponse(client().prepareSearch("index") + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.matchQuery("foo", "baz"))).get()); IndicesStatsResponse stats = client().admin().indices().prepareStats("index").setQueryCache(true).get(); assertCumulativeQueryCacheStats(stats); assertThat(stats.getTotal().queryCache.getHitCount(), greaterThan(0L)); @@ -1020,7 +1069,8 @@ public class IndexStatsIT extends ESIntegTestCase { client().prepareIndex("index", "type", "2").setSource("foo", "baz")); assertBusy(() -> { - assertSearchResponse(client().prepareSearch("index").setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.matchQuery("foo", "baz"))).get()); + assertSearchResponse(client().prepareSearch("index") + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.matchQuery("foo", "baz"))).get()); IndicesStatsResponse stats = client().admin().indices().prepareStats("index").setQueryCache(true).get(); assertCumulativeQueryCacheStats(stats); assertThat(stats.getTotal().queryCache.getHitCount(), greaterThan(0L)); diff --git a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index f9028a51a3c..a0e68b560ee 100644 --- a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -340,7 +340,8 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "_name", node4) )); - assertFalse(client().admin().cluster().prepareHealth().setWaitForNoRelocatingShards(true).setWaitForGreenStatus().setWaitForNodes("5").get().isTimedOut()); + assertFalse(client().admin().cluster().prepareHealth().setWaitForNoRelocatingShards(true).setWaitForGreenStatus() + .setWaitForNodes("5").get().isTimedOut()); // disable allocation to control the situation more easily assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() @@ -406,7 +407,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { final int numShards = scaledRandomIntBetween(2, 10); assertAcked(prepareCreate("test") - .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)) ); ensureGreen("test"); @@ -424,9 +427,12 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { // disable relocations when we do this, to make sure the shards are not relocated from node2 // due to rebalancing, and delete its content - client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)).get(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)) + .get(); - ClusterApplierService clusterApplierService = internalCluster().getInstance(ClusterService.class, nonMasterNode).getClusterApplierService(); + ClusterApplierService clusterApplierService = internalCluster().getInstance(ClusterService.class, nonMasterNode) + .getClusterApplierService(); ClusterState currentState = clusterApplierService.state(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); for (int j = 0; j < numShards; j++) { diff --git a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java index ed360b7c1c2..35c211dc4ae 100644 --- a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java +++ b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java @@ -79,7 +79,8 @@ public class IndicesStoreTests extends ESTestCase { } String currentNodeId = state == ShardRoutingState.UNASSIGNED ? null : randomAlphaOfLength(10); String relocatingNodeId = state == ShardRoutingState.RELOCATING ? randomAlphaOfLength(10) : null; - routingTable.addShard(TestShardRouting.newShardRouting("test", i, currentNodeId, relocatingNodeId, j == 0, state, unassignedInfo)); + routingTable.addShard(TestShardRouting.newShardRouting("test", i, currentNodeId, relocatingNodeId, j == 0, state, + unassignedInfo)); } } @@ -95,10 +96,12 @@ public class IndicesStoreTests extends ESTestCase { for (int i = 0; i < numShards; i++) { int localNodeIndex = randomInt(numReplicas); boolean primaryOnLocalNode = i == localShardId && localNodeIndex == numReplicas; - routingTable.addShard(TestShardRouting.newShardRouting("test", i, primaryOnLocalNode ? localNode.getId() : randomAlphaOfLength(10), true, ShardRoutingState.STARTED)); + routingTable.addShard(TestShardRouting.newShardRouting("test", i, primaryOnLocalNode ? localNode.getId() : + randomAlphaOfLength(10), true, ShardRoutingState.STARTED)); for (int j = 0; j < numReplicas; j++) { boolean replicaOnLocalNode = i == localShardId && localNodeIndex == j; - routingTable.addShard(TestShardRouting.newShardRouting("test", i, replicaOnLocalNode ? localNode.getId() : randomAlphaOfLength(10), false, ShardRoutingState.STARTED)); + routingTable.addShard(TestShardRouting.newShardRouting("test", i, replicaOnLocalNode ? localNode.getId() : + randomAlphaOfLength(10), false, ShardRoutingState.STARTED)); } }