Merge branch 'master' into feature/rank-eval

This commit is contained in:
Christoph Büscher 2016-09-22 12:21:20 +02:00
commit 1909877728
63 changed files with 609 additions and 414 deletions

View File

@ -4,4 +4,4 @@ test -> test
verify -> check
verify -Dskip.unit.tests -> integTest
package -DskipTests -> assemble
install -DskipTests -> install
install -DskipTests -> publishToMavenLocal

View File

@ -176,6 +176,9 @@ public class PluginBuildPlugin extends BuildPlugin {
/** Find the reponame. */
static String urlFromOrigin(String origin) {
if (origin == null) {
return null // best effort, the url doesnt really matter, it is just required by maven central
}
if (origin.startsWith('https')) {
return origin
}
@ -218,13 +221,16 @@ public class PluginBuildPlugin extends BuildPlugin {
zip(MavenPublication) {
artifact project.bundlePlugin
}
// HUGE HACK: the underlying maven publication library refuses to deploy any attached artifacts
// when the packaging type is set to 'pom'. So here we create another publication using the same
// name that has the "real" pom, and rely on the fact that gradle will execute the publish tasks
// in alphabetical order. We cannot setup a dependency between the tasks because the publishing
// tasks are created *extremely* late in the configuration phase, so that we cannot get ahold
// of the actual task. Furthermore, this entire hack only exists so we can make publishing to
// maven local work, since we publish to maven central externally.
/* HUGE HACK: the underlying maven publication library refuses to deploy any attached artifacts
* when the packaging type is set to 'pom'. But Sonatype's OSS repositories require source files
* for artifacts that are of type 'zip'. We already publish the source and javadoc for Elasticsearch
* under the various other subprojects. So here we create another publication using the same
* name that has the "real" pom, and rely on the fact that gradle will execute the publish tasks
* in alphabetical order. This lets us publish the zip file and even though the pom says the
* type is 'pom' instead of 'zip'. We cannot setup a dependency between the tasks because the
* publishing tasks are created *extremely* late in the configuration phase, so that we cannot get
* ahold of the actual task. Furthermore, this entire hack only exists so we can make publishing to
* maven local work, since we publish to maven central externally. */
zipReal(MavenPublication) {
pom.withXml { XmlProvider xml ->
Node root = xml.asNode()

View File

@ -59,8 +59,7 @@ class PrecommitTasks {
* use the NamingConventionsCheck we break the circular dependency
* here.
*/
// https://github.com/elastic/elasticsearch/issues/20243
// precommitTasks.add(configureLoggerUsage(project))
precommitTasks.add(configureLoggerUsage(project))
}

View File

@ -95,11 +95,11 @@ class ClusterConfiguration {
@Input
Closure waitCondition = { NodeInfo node, AntBuilder ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.echo("==> [${new Date()}] checking health: http://${node.httpUri()}/_cluster/health?wait_for_nodes>=${numNodes}")
ant.echo("==> [${new Date()}] checking health: http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}")
// checking here for wait_for_nodes to be >= the number of nodes because its possible
// this cluster is attempting to connect to nodes created by another task (same cluster name),
// so there will be more nodes in that case in the cluster state
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes>=${numNodes}",
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}",
dest: tmpFile.toString(),
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
retries: 10)

View File

@ -147,7 +147,6 @@ class ClusterFormationTasks {
setup = configureStopTask(taskName(task, node, 'stopPrevious'), project, setup, node)
setup = configureExtractTask(taskName(task, node, 'extract'), project, setup, node, configuration)
setup = configureWriteConfigTask(taskName(task, node, 'configure'), project, setup, node, seedNode)
setup = configureExtraConfigFilesTask(taskName(task, node, 'extraConfig'), project, setup, node)
setup = configureCopyPluginsTask(taskName(task, node, 'copyPlugins'), project, setup, node)
// install modules
@ -162,6 +161,10 @@ class ClusterFormationTasks {
setup = configureInstallPluginTask(taskName(task, node, actionName), project, setup, node, plugin.getValue())
}
// sets up any extra config files that need to be copied over to the ES instance;
// its run after plugins have been installed, as the extra config files may belong to plugins
setup = configureExtraConfigFilesTask(taskName(task, node, 'extraConfig'), project, setup, node)
// extra setup commands
for (Map.Entry<String, Object[]> command : node.config.setupCommands.entrySet()) {
// the first argument is the actual script name, relative to home

View File

@ -633,8 +633,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.repositories.RepositoryMissingException::new, 107),
DOCUMENT_SOURCE_MISSING_EXCEPTION(org.elasticsearch.index.engine.DocumentSourceMissingException.class,
org.elasticsearch.index.engine.DocumentSourceMissingException::new, 109),
FLUSH_NOT_ALLOWED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.FlushNotAllowedEngineException.class,
org.elasticsearch.index.engine.FlushNotAllowedEngineException::new, 110),
// 110 used to be FlushNotAllowedEngineException
NO_CLASS_SETTINGS_EXCEPTION(org.elasticsearch.common.settings.NoClassSettingsException.class,
org.elasticsearch.common.settings.NoClassSettingsException::new, 111),
BIND_TRANSPORT_EXCEPTION(org.elasticsearch.transport.BindTransportException.class,

View File

@ -40,7 +40,7 @@ import java.io.IOException;
public class FlushRequest extends BroadcastRequest<FlushRequest> {
private boolean force = false;
private boolean waitIfOngoing = false;
private boolean waitIfOngoing = true;
/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
@ -61,6 +61,7 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
/**
* if set to <tt>true</tt> the flush will block
* if a another flush operation is already running until the flush can be performed.
* The default is <code>true</code>
*/
public FlushRequest waitIfOngoing(boolean waitIfOngoing) {
this.waitIfOngoing = waitIfOngoing;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -28,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.snapshots.Snapshot;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Objects;
/**
@ -247,4 +249,14 @@ public abstract class RecoverySource implements Writeable, ToXContent {
return "peer recovery";
}
}
private static EnumSet<RecoverySource.Type> INITIAL_RECOVERY_TYPES = EnumSet.of(Type.EMPTY_STORE, Type.LOCAL_SHARDS, Type.SNAPSHOT);
/**
* returns true for recovery types that indicate that a primary is being allocated for the very first time.
* This recoveries can be controlled by {@link IndexMetaData#INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING}
*/
public static boolean isInitialRecovery(RecoverySource.Type type) {
return INITIAL_RECOVERY_TYPES.contains(type);
}
}

View File

@ -706,6 +706,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
updateAssigned(candidate, reinitializedShard);
inactivePrimaryCount++;
inactiveShardCount++;
addRecovery(reinitializedShard);
return reinitializedShard;
}

View File

@ -93,8 +93,8 @@ public class FilterAllocationDecider extends AllocationDecider {
// this is a setting that can only be set within the system!
IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index());
DiscoveryNodeFilters initialRecoveryFilters = indexMd.getInitialRecoveryFilters();
if (shardRouting.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE &&
initialRecoveryFilters != null &&
if (initialRecoveryFilters != null &&
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) &&
initialRecoveryFilters.match(node.node()) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match index initial recovery filters [%s]",
indexMd.includeFilters());

View File

@ -20,6 +20,8 @@
package org.elasticsearch.common.util;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.logging.Loggers;
@ -64,8 +66,8 @@ public class IndexFolderUpgrader {
} catch (NoSuchFileException | FileNotFoundException exception) {
// thrown when the source is non-existent because the folder was renamed
// by another node (shared FS) after we checked if the target exists
logger.error("multiple nodes trying to upgrade [{}] in parallel, retry upgrading with single node",
exception, target);
logger.error((Supplier<?>) () -> new ParameterizedMessage("multiple nodes trying to upgrade [{}] in parallel, retry " +
"upgrading with single node", target), exception);
throw exception;
} finally {
if (success) {

View File

@ -479,7 +479,9 @@ public abstract class Engine implements Closeable {
try {
length = directory.fileLength(file);
} catch (NoSuchFileException | FileNotFoundException e) {
logger.warn("Tried to query fileLength but file is gone [{}] [{}]", e, directory, file);
final Directory finalDirectory = directory;
logger.warn((Supplier<?>)
() -> new ParameterizedMessage("Tried to query fileLength but file is gone [{}] [{}]", finalDirectory, file), e);
} catch (IOException e) {
final Directory finalDirectory = directory;
logger.warn(
@ -1105,8 +1107,6 @@ public abstract class Engine implements Closeable {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
try {
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
} catch (FlushNotAllowedEngineException ex) {
logger.debug("flush not allowed during flushAndClose - skipping");
} catch (EngineClosedException ex) {
logger.debug("engine already closed - skipping flushAndClose");
}
@ -1233,4 +1233,11 @@ public abstract class Engine implements Closeable {
* This operation will close the engine if the recovery fails.
*/
public abstract Engine recoverFromTranslog() throws IOException;
/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
public boolean isRecovering() {
return false;
}
}

View File

@ -1,45 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
*
*/
public class FlushNotAllowedEngineException extends EngineException {
public FlushNotAllowedEngineException(ShardId shardId, String msg) {
super(shardId, msg);
}
public FlushNotAllowedEngineException(StreamInput in) throws IOException{
super(in);
}
@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}

View File

@ -116,7 +116,7 @@ public class InternalEngine extends Engine {
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean allowCommits = new AtomicBoolean(true);
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@ -163,8 +163,9 @@ public class InternalEngine extends Engine {
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners());
}
@ -190,14 +191,14 @@ public class InternalEngine extends Engine {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
if (allowCommits.get()) {
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
} catch (Exception e) {
try {
allowCommits.set(false); // just play safe and never allow commits on this
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
failEngine("failed to recover from translog", e);
} catch (Exception inner) {
e.addSuppressed(inner);
@ -221,8 +222,8 @@ public class InternalEngine extends Engine {
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert allowCommits.get() == false : "commits are allowed but shouldn't";
allowCommits.set(true); // we are good - now we can commit
assert pendingTranslogRecovery.get(): "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
@ -765,7 +766,7 @@ public class InternalEngine extends Engine {
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
return new CommitId(lastCommittedSegmentInfos.getId());
}
} else {
logger.trace("acquired flush lock immediately");
@ -1287,8 +1288,8 @@ public class InternalEngine extends Engine {
// if we are in this stage we have to prevent flushes from this
// engine otherwise we might loose documents if the flush succeeds
// and the translog recover fails we we "commit" the translog on flush.
if (allowCommits.get() == false) {
throw new FlushNotAllowedEngineException(shardId, "flushes are disabled - pending translog recovery");
if (pendingTranslogRecovery.get()) {
throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery");
}
}
@ -1349,4 +1350,9 @@ public class InternalEngine extends Engine {
boolean indexWriterHasDeletions() {
return indexWriter.hasDeletions();
}
@Override
public boolean isRecovering() {
return pendingTranslogRecovery.get();
}
}

View File

@ -730,7 +730,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
verifyStartedOrRecovering();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
return getEngine().syncFlush(syncId, expectedCommitId);
Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "syncFlush is only allowed if the engine is not recovery" +
" from translog");
}
return engine.syncFlush(syncId, expectedCommitId);
}
public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
@ -741,11 +746,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
// we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since
// we use #writeIndexingBuffer for this now.
verifyStartedOrRecovering();
Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" +
" from translog");
}
long time = System.nanoTime();
Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
@ -1165,7 +1175,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
indexEventListener.onShardInactive(this);
try {
indexEventListener.onShardInactive(this);
} catch (Exception e) {
logger.warn("failed to notify index event listener", e);
}
}
}
}

View File

@ -388,7 +388,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
} catch (FileNotFoundException | NoSuchFileException ex) {
logger.info("Failed to open / find files while reading metadata snapshot");
} catch (ShardLockObtainFailedException ex) {
logger.info("{}: failed to obtain shard lock", ex, shardId);
logger.info((Supplier<?>) () -> new ParameterizedMessage("{}: failed to obtain shard lock", shardId), ex);
}
return MetadataSnapshot.EMPTY;
}
@ -420,7 +420,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
} catch (ShardLockObtainFailedException ex) {
logger.error("{} unable to acquire shard lock", ex, shardId);
logger.error((Supplier<?>) () -> new ParameterizedMessage("{} unable to acquire shard lock", shardId), ex);
throw new IOException(ex);
}
}

View File

@ -58,34 +58,21 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
return firstOperationOffset;
}
public Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]";
ByteBuffer buffer = ByteBuffer.allocate(location.size);
try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) {
return read(checksumStreamInput);
}
}
/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
protected final int readSize(ByteBuffer reusableBuffer, long position) {
protected final int readSize(ByteBuffer reusableBuffer, long position) throws IOException {
// read op size from disk
assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]";
try {
reusableBuffer.clear();
reusableBuffer.limit(4);
readBytes(reusableBuffer, position);
reusableBuffer.flip();
// Add an extra 4 to account for the operation size integer itself
final int size = reusableBuffer.getInt() + 4;
final long maxSize = sizeInBytes() - position;
if (size < 0 || size > maxSize) {
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
return size;
} catch (IOException e) {
throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.path, e);
reusableBuffer.clear();
reusableBuffer.limit(4);
readBytes(reusableBuffer, position);
reusableBuffer.flip();
// Add an extra 4 to account for the operation size integer itself
final int size = reusableBuffer.getInt() + 4;
final long maxSize = sizeInBytes() - position;
if (size < 0 || size > maxSize) {
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
return size;
}
public Translog.Snapshot newSnapshot() {

View File

@ -384,31 +384,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return newFile;
}
/**
* Read the Operation object from the given location. This method will try to read the given location from
* the current or from the currently committing translog file. If the location is in a file that has already
* been closed or even removed the method will return <code>null</code> instead.
*/
Translog.Operation read(Location location) { // TODO this is only here for testing - we can remove it?
try (ReleasableLock lock = readLock.acquire()) {
final BaseTranslogReader reader;
final long currentGeneration = current.getGeneration();
if (currentGeneration == location.generation) {
reader = current;
} else if (readers.isEmpty() == false && readers.get(readers.size() - 1).getGeneration() == location.generation) {
reader = readers.get(readers.size() - 1);
} else if (currentGeneration < location.generation) {
throw new IllegalStateException("location generation [" + location.generation + "] is greater than the current generation [" + currentGeneration + "]");
} else {
return null;
}
return reader.read(location);
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from translog location " + location, e);
}
}
/**
* Adds a delete / index operations to the transaction log.
*
@ -432,7 +407,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes);
assert assertBytesAtLocation(location, bytes);
return location;
}
} catch (AlreadyClosedException | IOException ex) {
@ -469,12 +443,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
// tests can override this
ByteBuffer buffer = ByteBuffer.allocate(location.size);
current.readBytes(buffer, location.translogLocation);
return new BytesArray(buffer.array()).equals(expectedBytes);
}
/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.

View File

@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
public class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
private final int totalOperations;
protected final long length;
@ -51,7 +51,7 @@ public class TranslogSnapshot extends BaseTranslogReader implements Translog.Sna
}
@Override
public final int totalOperations() {
public int totalOperations() {
return totalOperations;
}
@ -64,7 +64,7 @@ public class TranslogSnapshot extends BaseTranslogReader implements Translog.Sna
}
}
protected final Translog.Operation readOperation() throws IOException {
protected Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
@ -386,8 +385,8 @@ public class IndexingMemoryController extends AbstractComponent implements Index
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
} catch (EngineClosedException | FlushNotAllowedEngineException e) {
logger.trace("ignore exception while checking if shard {} is inactive", e, shard.shardId());
} catch (EngineClosedException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("ignore exception while checking if shard {} is inactive", shard.shardId()), e);
}
}
}

View File

@ -245,7 +245,7 @@ public class PluginsService extends AbstractComponent {
try {
reference.onModuleMethod.invoke(plugin.v2(), module);
} catch (IllegalAccessException | InvocationTargetException e) {
logger.warn("plugin {}, failed to invoke custom onModule method", e, plugin.v1().getName());
logger.warn((Supplier<?>) () -> new ParameterizedMessage("plugin {}, failed to invoke custom onModule method", plugin.v1().getName()), e);
throw new ElasticsearchException("failed to invoke onModule", e);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("plugin {}, failed to invoke custom onModule method", plugin.v1().getName()), e);

View File

@ -395,7 +395,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} catch (SnapshotMissingException ex) {
throw ex;
} catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) {
logger.warn("cannot read snapshot file [{}]", ex, snapshotId);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
}
MetaData metaData = null;
try {
@ -405,7 +405,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
metaData = readSnapshotMetaData(snapshotId, null, repositoryData.resolveIndices(indices), true);
}
} catch (IOException | SnapshotException ex) {
logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("cannot read metadata for snapshot [{}]", snapshotId), ex);
}
try {
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
@ -601,7 +601,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getUUID()), false);
} catch (ElasticsearchParseException | IOException ex) {
if (ignoreIndexErrors) {
logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index.getName());
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index.getName()), ex);
} else {
throw ex;
}

View File

@ -72,10 +72,10 @@ public class NativeScriptEngineService extends AbstractComponent implements Scri
@Override
public SearchScript search(CompiledScript compiledScript, final SearchLookup lookup, @Nullable final Map<String, Object> vars) {
final NativeScriptFactory scriptFactory = (NativeScriptFactory) compiledScript.compiled();
final AbstractSearchScript script = (AbstractSearchScript) scriptFactory.newScript(vars);
return new SearchScript() {
@Override
public LeafSearchScript getLeafSearchScript(LeafReaderContext context) throws IOException {
AbstractSearchScript script = (AbstractSearchScript) scriptFactory.newScript(vars);
script.setLookup(lookup.getLeafSearchLookup(context));
return script;
}

View File

@ -757,7 +757,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(107, org.elasticsearch.repositories.RepositoryMissingException.class);
ids.put(108, null);
ids.put(109, org.elasticsearch.index.engine.DocumentSourceMissingException.class);
ids.put(110, org.elasticsearch.index.engine.FlushNotAllowedEngineException.class);
ids.put(110, null); // FlushNotAllowedEngineException was removed in 5.0
ids.put(111, org.elasticsearch.common.settings.NoClassSettingsException.class);
ids.put(112, org.elasticsearch.transport.BindTransportException.class);
ids.put(113, org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException.class);

View File

@ -388,17 +388,22 @@ public class CreateIndexIT extends ESIntegTestCase {
.put("index.blocks.write", true)).get();
ensureGreen();
// now merge source into a single shard index
final boolean createWithReplicas = randomBoolean();
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
// bump replicas
client().admin().indices().prepareUpdateSettings("target")
.setSettings(Settings.builder()
.put("index.number_of_replicas", 1)).get();
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
if (createWithReplicas == false) {
// bump replicas
client().admin().indices().prepareUpdateSettings("target")
.setSettings(Settings.builder()
.put("index.number_of_replicas", 1)).get();
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
}
for (int i = 20; i < 40; i++) {
client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
}

View File

@ -49,7 +49,7 @@ public class FlushBlocksIT extends ESIntegTestCase {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
try {
enableIndexBlock("test", blockSetting);
FlushResponse response = client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().actionGet();
FlushResponse response = client().admin().indices().prepareFlush("test").execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {

View File

@ -54,7 +54,7 @@ public class IndicesSegmentsRequestTests extends ESSingleNodeTestCase {
String id = Integer.toString(j);
client().prepareIndex("test", "type1", id).setSource("text", "sometext").get();
}
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).get();
client().admin().indices().prepareFlush("test").get();
}
public void testBasic() {

View File

@ -213,7 +213,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
builders[i] = client().prepareIndex(index, "type").setSource("field", "value");
}
indexRandom(true, builders);
client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet();
client().admin().indices().prepareFlush().setForce(true).execute().actionGet();
}
private static final class IndexNodePredicate implements Predicate<Settings> {

View File

@ -25,28 +25,34 @@ import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.util.Collections;
import java.util.Arrays;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_NAME;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_UUID;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
public class FilterAllocationDeciderTests extends ESAllocationTestCase {
public void testFilterInitialAllocation() {
public void testFilterInitialRecovery() {
FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY,
Collections.singleton(filterAllocationDecider));
Arrays.asList(filterAllocationDecider, new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY)));
AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders,
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id",
@ -61,27 +67,31 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
assertNull(routingTable.index("idx").shard(0).shards().get(0).currentNodeId());
// after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0, false);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
state.getRoutingNodes().node("node2")
,allocation), Decision.YES);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
state.getRoutingNodes().node("node1")
,allocation), Decision.NO);
// after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node
state = service.reroute(state, "try allocate again");
routingTable = state.routingTable();
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2");
state = service.applyStartedShards(state, routingTable.index("idx").shard(0).shards());
state = service.applyStartedShards(state, routingTable.index("idx").shard(0).shardsWithState(INITIALIZING));
routingTable = state.routingTable();
// ok now we are started and can be allocated anywhere!! lets see...
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), STARTED);
assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), STARTED);
assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2");
// replicas should be initializing
assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1");
// we fail it again to check if we are initializing immediately on the other node
state = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0));
@ -100,20 +110,40 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
}
private ClusterState createInitialClusterState(AllocationService service, Settings settings) {
MetaData.Builder metaBuilder = MetaData.builder();
metaBuilder.put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT).put(settings))
.numberOfShards(1).numberOfReplicas(0));
MetaData metaData = metaBuilder.build();
boolean shrinkIndex = randomBoolean();
MetaData.Builder metaData = MetaData.builder();
final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings);
final IndexMetaData sourceIndex;
if (shrinkIndex) {
//put a fake closed source index
sourceIndex = IndexMetaData.builder("sourceIndex")
.settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0).build();
metaData.put(sourceIndex, false);
indexSettings.put(INDEX_SHRINK_SOURCE_UUID.getKey(), sourceIndex.getIndexUUID());
indexSettings.put(INDEX_SHRINK_SOURCE_NAME.getKey(), sourceIndex.getIndex().getName());
} else {
sourceIndex = null;
}
final IndexMetaData indexMetaData = IndexMetaData.builder("idx").settings(indexSettings)
.numberOfShards(1).numberOfReplicas(1).build();
metaData.put(indexMetaData, false);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsNew(metaData.index("idx"));
if (shrinkIndex) {
routingTableBuilder.addAsFromCloseToOpen(sourceIndex);
routingTableBuilder.addAsNew(indexMetaData);
} if (randomBoolean()) {
routingTableBuilder.addAsNew(indexMetaData);
} else {
routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource(
new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")),
Version.CURRENT, indexMetaData.getIndex().getName()));
}
RoutingTable routingTable = routingTableBuilder.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build();
routingTable = service.reroute(clusterState, "reroute", false).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
return clusterState;
return service.reroute(clusterState, "reroute", false);
}
}

View File

@ -529,7 +529,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
} catch (InterruptedException e) {
// fine - semaphore interrupt
} catch (AssertionError | Exception e) {
logger.info("unexpected exception in background thread of [{}]", e, node);
logger.info((Supplier<?>) () -> new ParameterizedMessage("unexpected exception in background thread of [{}]", node), e);
}
}
});

View File

@ -417,7 +417,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
client().admin().indices().prepareFlush().setForce(true).get();
boolean useSyncIds = randomBoolean();
if (useSyncIds == false) {

View File

@ -80,7 +80,7 @@ public class ReusePeerRecoverySharedTest {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
// just wait for merges
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get();
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
client().admin().indices().prepareFlush().setForce(true).get();
if (useSyncIds == false) {
logger.info("--> disabling allocation while the cluster is shut down");

View File

@ -142,7 +142,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("foo", "doc", ""+i).setSource("foo", "bar").get();
}
assertNoFailures(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertNoFailures(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()

View File

@ -586,6 +586,7 @@ public class InternalEngineTests extends ESTestCase {
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
@ -594,13 +595,16 @@ public class InternalEngineTests extends ESTestCase {
}
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
assertFalse(engine.isRecovering());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
expectThrows(FlushNotAllowedEngineException.class, () -> engine.flush(true, true));
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
@ -2114,6 +2118,7 @@ public class InternalEngineTests extends ESTestCase {
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
assertFalse(engine.isRecovering());
engine.index(firstIndexRequest);
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
@ -2126,6 +2131,7 @@ public class InternalEngineTests extends ESTestCase {
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));

View File

@ -159,7 +159,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -262,7 +262,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -408,7 +408,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -491,7 +491,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -546,7 +546,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);

View File

@ -85,6 +85,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -206,53 +207,6 @@ public class TranslogTests extends ESTestCase {
return string;
}
public void testRead() throws IOException {
Location loc0 = translog.getLastWriteLocation();
assertNotNull(loc0);
Translog.Location loc1 = translog.add(new Translog.Index("test", "1", new byte[]{1}));
assertThat(loc1, greaterThan(loc0));
assertThat(translog.getLastWriteLocation(), greaterThan(loc1));
Translog.Location loc2 = translog.add(new Translog.Index("test", "2", new byte[]{2}));
assertThat(loc2, greaterThan(loc1));
assertThat(translog.getLastWriteLocation(), greaterThan(loc2));
assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));
Translog.Location lastLocBeforeSync = translog.getLastWriteLocation();
translog.sync();
assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));
Translog.Location loc3 = translog.add(new Translog.Index("test", "2", new byte[]{3}));
assertThat(loc3, greaterThan(loc2));
assertThat(translog.getLastWriteLocation(), greaterThan(loc3));
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
lastLocBeforeSync = translog.getLastWriteLocation();
translog.sync();
assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
translog.prepareCommit();
/*
* The commit adds to the lastWriteLocation even though is isn't really a write. This is just an implementation artifact but it can
* safely be ignored because the lastWriteLocation continues to be greater than the Location returned from the last write operation
* and less than the location of the next write operation.
*/
assertThat(translog.getLastWriteLocation(), greaterThan(lastLocBeforeSync));
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
translog.commit();
assertNull(translog.read(loc1));
assertNull(translog.read(loc2));
assertNull(translog.read(loc3));
try {
translog.read(new Translog.Location(translog.currentFileGeneration() + 1, 17, 35));
fail("generation is greater than the current");
} catch (IllegalStateException ex) {
// expected
}
}
public void testSimpleOperations() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
@ -441,7 +395,7 @@ public class TranslogTests extends ESTestCase {
assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id))));
}
static class LocationOperation {
static class LocationOperation implements Comparable<LocationOperation> {
final Translog.Operation operation;
final Translog.Location location;
@ -450,6 +404,10 @@ public class TranslogTests extends ESTestCase {
this.location = location;
}
@Override
public int compareTo(LocationOperation o) {
return location.compareTo(o.location);
}
}
public void testConcurrentWritesWithVaryingSize() throws Throwable {
@ -478,8 +436,12 @@ public class TranslogTests extends ESTestCase {
threads[i].join(60 * 1000);
}
for (LocationOperation locationOperation : writtenOperations) {
Translog.Operation op = translog.read(locationOperation.location);
List<LocationOperation> collect = writtenOperations.stream().collect(Collectors.toList());
Collections.sort(collect);
Translog.Snapshot snapshot = translog.newSnapshot();
for (LocationOperation locationOperation : collect) {
Translog.Operation op = snapshot.next();
assertNotNull(op);
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
@ -505,6 +467,7 @@ public class TranslogTests extends ESTestCase {
}
}
assertNull(snapshot.next());
}
@ -521,13 +484,16 @@ public class TranslogTests extends ESTestCase {
corruptTranslogs(translogDir);
AtomicInteger corruptionsCaught = new AtomicInteger(0);
Translog.Snapshot snapshot = translog.newSnapshot();
for (Translog.Location location : locations) {
try {
translog.read(location);
Translog.Operation next = snapshot.next();
assertNotNull(next);
} catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet();
}
}
expectThrows(TranslogCorruptedException.class, () -> snapshot.next());
assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1));
}
@ -544,15 +510,12 @@ public class TranslogTests extends ESTestCase {
truncateTranslogs(translogDir);
AtomicInteger truncations = new AtomicInteger(0);
Translog.Snapshot snap = translog.newSnapshot();
for (Translog.Location location : locations) {
try {
translog.read(location);
} catch (ElasticsearchException e) {
if (e.getCause() instanceof EOFException) {
truncations.incrementAndGet();
} else {
throw e;
}
assertNotNull(snap.next());
} catch (EOFException e) {
truncations.incrementAndGet();
}
}
assertThat("at least one truncation was caused and caught", truncations.get(), greaterThanOrEqualTo(1));
@ -860,8 +823,14 @@ public class TranslogTests extends ESTestCase {
}
assertEquals(max.generation, translog.currentFileGeneration());
final Translog.Operation read = translog.read(max);
assertEquals(read.getSource().source.utf8ToString(), Integer.toString(count));
Translog.Snapshot snap = translog.newSnapshot();
Translog.Operation next;
Translog.Operation maxOp = null;
while ((next = snap.next()) != null) {
maxOp = next;
}
assertNotNull(maxOp);
assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count));
}
public static Translog.Location max(Translog.Location a, Translog.Location b) {
@ -884,30 +853,24 @@ public class TranslogTests extends ESTestCase {
}
}
assertEquals(translogOperations, translog.totalOperations());
final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
assertEquals(lastSynced + 1, reader.totalOperations());
Translog.Snapshot snapshot = reader.newSnapshot();
for (int op = 0; op < translogOperations; op++) {
Translog.Location location = locations.get(op);
if (op <= lastSynced) {
final Translog.Operation read = reader.read(location);
final Translog.Operation read = snapshot.next();
assertEquals(Integer.toString(op), read.getSource().source.utf8ToString());
} else {
try {
reader.read(location);
fail("read past checkpoint");
} catch (EOFException ex) {
}
Translog.Operation next = snapshot.next();
assertNull(next);
}
}
try {
reader.read(lastLocation);
fail("read past checkpoint");
} catch (EOFException ex) {
}
Translog.Operation next = snapshot.next();
assertNull(next);
}
assertEquals(translogOperations + 1, translog.totalOperations());
translog.close();
@ -1618,11 +1581,6 @@ public class TranslogTests extends ESTestCase {
}
};
}
@Override
protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException {
return true; // we don't wanna fail in the assert
}
};
}

View File

@ -206,10 +206,14 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
break;
}
String name = "index_" + randomAsciiOfLength(15).toLowerCase(Locale.ROOT);
CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder()
Settings.Builder settingsBuilder = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3))
.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2))
.build()).waitForActiveShards(ActiveShardCount.NONE);
.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2));
if (randomBoolean()) {
settingsBuilder.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true);
}
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name));
}

View File

@ -62,7 +62,7 @@ public class FlushIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(10);
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();
for (int j = 0; j < 10; j++) {
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute(new ActionListener<FlushResponse>() {
client().admin().indices().prepareFlush("test").execute(new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
try {

View File

@ -348,7 +348,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
}
indexRandom(true, builder);
if (randomBoolean()) {
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).execute().get();
client().admin().indices().prepareFlush("test").setForce(true).execute().get();
}
client().admin().indices().prepareClose("test").execute().get();

View File

@ -111,7 +111,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
}
ensureGreen();
// ensure we have flushed segments and make them a big one via optimize
client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).get();
client().admin().indices().prepareFlush().setForce(true).get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get();
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -67,7 +67,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
String id = Integer.toString(i);
client().prepareIndex(indexName, "type1", id).setSource("text", "sometext").get();
}
client().admin().indices().prepareFlush(indexName).setWaitIfOngoing(true).get();
client().admin().indices().prepareFlush(indexName).get();
logger.info("--> create first snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()

View File

@ -78,8 +78,15 @@ public class DeprecationRestHandlerTests extends ESTestCase {
ASCIIHeaderGenerator generator = new ASCIIHeaderGenerator();
String value = generator.ofCodeUnitsLength(random(), 1, 50);
assertTrue(DeprecationRestHandler.validHeaderValue(value));
assertSame(value, DeprecationRestHandler.requireValidHeader(value));
if (value.trim().length() == 0) {
// empty text, not a valid header
assertFalse(DeprecationRestHandler.validHeaderValue(value));
Exception e = expectThrows(IllegalArgumentException.class, () -> DeprecationRestHandler.requireValidHeader(value));
assertEquals("header value must contain only US ASCII text", e.getMessage());
} else {
assertTrue(DeprecationRestHandler.validHeaderValue(value));
assertSame(value, DeprecationRestHandler.requireValidHeader(value));
}
}
public void testInvalidHeaderValue() {

View File

@ -99,7 +99,7 @@ public class SearchWithRandomIOExceptionsIT extends ESIntegTestCase {
client().prepareIndex("test", "type", "init" + i).setSource("test", "init").get();
}
client().admin().indices().prepareRefresh("test").execute().get();
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().get();
client().admin().indices().prepareFlush("test").execute().get();
client().admin().indices().prepareClose("test").execute().get();
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), exceptionRate)

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.ESIntegTestCase;
@ -617,11 +616,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
if (threadRandom.nextInt(100) == 7) {
logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime);
try {
flush();
} catch (FlushNotAllowedEngineException fnaee) {
// OK
}
flush();
logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime);
}
}

View File

@ -35,7 +35,16 @@ publishing {
artifactId 'elasticsearch'
artifact buildZip
}
// TODO explain why we can't use the pom build by nebula
/* HUGE HACK: the underlying maven publication library refuses to deploy any attached artifacts
* when the packaging type is set to 'pom'. But Sonatype's OSS repositories require source files
* for artifacts that are of type 'zip'. We already publish the source and javadoc for Elasticsearch
* under the various other subprojects. So here we create another publication using the same
* name that has the "real" pom, and rely on the fact that gradle will execute the publish tasks
* in alphabetical order. This lets us publish the zip file and even though the pom says the
* type is 'pom' instead of 'zip'. We cannot setup a dependency between the tasks because the
* publishing tasks are created *extremely* late in the configuration phase, so that we cannot get
* ahold of the actual task. Furthermore, this entire hack only exists so we can make publishing to
* maven local work, since we publish to maven central externally. */
nebulaRealPom(MavenPublication) {
artifactId 'elasticsearch'
pom.packaging = 'pom'

View File

@ -35,7 +35,16 @@ publishing {
artifactId 'elasticsearch'
artifact buildZip
}
// TODO explain why we can't use the pom build by nebula
/* HUGE HACK: the underlying maven publication library refuses to deploy any attached artifacts
* when the packaging type is set to 'pom'. But Sonatype's OSS repositories require source files
* for artifacts that are of type 'zip'. We already publish the source and javadoc for Elasticsearch
* under the various other subprojects. So here we create another publication using the same
* name that has the "real" pom, and rely on the fact that gradle will execute the publish tasks
* in alphabetical order. This lets us publish the zip file and even though the pom says the
* type is 'pom' instead of 'zip'. We cannot setup a dependency between the tasks because the
* publishing tasks are created *extremely* late in the configuration phase, so that we cannot get
* ahold of the actual task. Furthermore, this entire hack only exists so we can make publishing to
* maven local work, since we publish to maven central externally. */
nebulaRealPom(MavenPublication) {
artifactId 'elasticsearch'
pom.packaging = 'pom'

View File

@ -24,7 +24,6 @@ apply plugin: 'elasticsearch.docs-test'
* only remove entries from this list. When it is empty we'll remove it
* entirely and have a party! There will be cake and everything.... */
buildRestTests.expectedUnconvertedCandidates = [
'plugins/discovery-azure-classic.asciidoc',
'reference/aggregations.asciidoc',
'reference/aggregations/bucket/children-aggregation.asciidoc',
'reference/aggregations/bucket/datehistogram-aggregation.asciidoc',
@ -115,7 +114,6 @@ buildRestTests.expectedUnconvertedCandidates = [
'reference/cat/snapshots.asciidoc',
'reference/cat/templates.asciidoc',
'reference/cat/thread_pool.asciidoc',
'reference/cluster.asciidoc',
'reference/cluster/allocation-explain.asciidoc',
'reference/cluster/nodes-info.asciidoc',
'reference/cluster/nodes-stats.asciidoc',
@ -161,8 +159,6 @@ buildRestTests.expectedUnconvertedCandidates = [
'reference/mapping/types/nested.asciidoc',
'reference/mapping/types/object.asciidoc',
'reference/mapping/types/percolator.asciidoc',
'reference/modules/cluster/misc.asciidoc',
'reference/modules/indices/request_cache.asciidoc',
'reference/modules/scripting/native.asciidoc',
'reference/modules/scripting/security.asciidoc',
'reference/modules/scripting/using.asciidoc',

View File

@ -378,6 +378,7 @@ curl -s https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticse
# Prepare Elasticsearch installation
sudo dpkg -i elasticsearch-{version}.deb
----
// NOTCONSOLE
Check that elasticsearch is running:

View File

@ -15,18 +15,19 @@ example, here are some sample executions of nodes info:
[source,js]
--------------------------------------------------
# Local
curl localhost:9200/_nodes/_local
GET /_nodes/_local
# Address
curl localhost:9200/_nodes/10.0.0.3,10.0.0.4
curl localhost:9200/_nodes/10.0.0.*
GET /_nodes/10.0.0.3,10.0.0.4
GET /_nodes/10.0.0.*
# Names
curl localhost:9200/_nodes/node_name_goes_here
curl localhost:9200/_nodes/node_name_goes_*
GET /_nodes/node_name_goes_here
GET /_nodes/node_name_goes_*
# Attributes (set something like node.attr.rack: 2 in the config)
curl localhost:9200/_nodes/rack:2
curl localhost:9200/_nodes/ra*:2
curl localhost:9200/_nodes/ra*:2*
GET /_nodes/rack:2
GET /_nodes/ra*:2
GET /_nodes/ra*:2*
--------------------------------------------------
// CONSOLE
--
include::cluster/health.asciidoc[]

View File

@ -50,4 +50,4 @@ PUT /_cluster/settings
}
}
-------------------------------
// CONSOLE

View File

@ -42,8 +42,10 @@ The cache can be expired manually with the <<indices-clearcache,`clear-cache` AP
[source,js]
------------------------
curl -XPOST 'localhost:9200/kimchy,elasticsearch/_cache/clear?request_cache=true'
POST /kimchy,elasticsearch/_cache/clear?request_cache=true
------------------------
// CONSOLE
// TEST[s/^/PUT kimchy\nPUT elasticsearch\n/]
[float]
==== Enabling and disabling caching
@ -53,24 +55,26 @@ index as follows:
[source,js]
-----------------------------
curl -XPUT localhost:9200/my_index -d'
PUT /my_index
{
"settings": {
"index.requests.cache.enable": false
}
}
'
-----------------------------
// CONSOLE
It can also be enabled or disabled dynamically on an existing index with the
<<indices-update-settings,`update-settings`>> API:
[source,js]
-----------------------------
curl -XPUT localhost:9200/my_index/_settings -d'
PUT /my_index/_settings
{ "index.requests.cache.enable": true }
'
-----------------------------
// CONSOLE
// TEST[continued]
[float]
==== Enabling and disabling caching per request
@ -80,7 +84,7 @@ caching on a *per-request* basis. If set, it overrides the index-level setting:
[source,js]
-----------------------------
curl 'localhost:9200/my_index/_search?request_cache=true' -d'
GET /my_index/_search?request_cache=true
{
"size": 0,
"aggs": {
@ -91,8 +95,9 @@ curl 'localhost:9200/my_index/_search?request_cache=true' -d'
}
}
}
'
-----------------------------
// CONSOLE
// TEST[continued]
IMPORTANT: If your query uses a script whose result is not deterministic (e.g.
it uses a random function or references the current time) you should set the
@ -137,12 +142,14 @@ by index, with the <<indices-stats,`indices-stats`>> API:
[source,js]
------------------------
curl 'localhost:9200/_stats/request_cache?pretty&human'
GET /_stats/request_cache?human
------------------------
// CONSOLE
or by node with the <<cluster-nodes-stats,`nodes-stats`>> API:
[source,js]
------------------------
curl 'localhost:9200/_nodes/stats/indices/request_cache?pretty&human'
GET /_nodes/stats/indices/request_cache?human
------------------------
// CONSOLE

View File

@ -33,7 +33,7 @@ Save the repository definition to +/etc/apt/sources.list.d/elastic-{major-versi
["source","sh",subs="attributes,callouts"]
--------------------------------------------------
echo "deb https://artifacts.elastic.co/packages/{major-version}/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-{major-version}.list
echo "deb https://artifacts.elastic.co/packages/{major-version}-prerelease/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-{major-version}.list
--------------------------------------------------
[WARNING]

View File

@ -34,7 +34,7 @@ OpenSuSE based distributions, containing:
--------------------------------------------------
[elasticsearch-{major-version}]
name=Elasticsearch repository for {major-version} packages
baseurl=https://artifacts.elastic.co/packages/{major-version}/yum
baseurl=https://artifacts.elastic.co/packages/{major-version}-prerelease/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1

View File

@ -315,7 +315,7 @@ public class GroovyScriptEngineService extends AbstractComponent implements Scri
}
throw ae;
} catch (Exception | NoClassDefFoundError e) {
logger.trace("failed to run {}", e, compiledScript);
logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to run {}", compiledScript), e);
throw new ScriptException("Error evaluating " + compiledScript.name(), e, emptyList(), "", compiledScript.lang());
}
}

View File

@ -262,8 +262,8 @@ public final class EAssignment extends AExpression {
rhs.write(writer, globals); // write the bytecode for the rhs
if (!(rhs instanceof EBinary) || ((EBinary)rhs).cat) {
writer.writeAppendStrings(rhs.actual); // append the rhs's value unless it's also a concatenation
if (!(rhs instanceof EBinary) || !((EBinary)rhs).cat) { // check to see if the rhs has already done a concatenation
writer.writeAppendStrings(rhs.actual); // append the rhs's value since it's hasn't already
}
writer.writeToStrings(); // put the value for string concat onto the stack

View File

@ -21,7 +21,9 @@ package org.elasticsearch.painless;
import static org.elasticsearch.painless.WriterConstants.MAX_INDY_STRING_CONCAT_ARGS;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
public class StringTests extends ScriptTestCase {
@ -222,6 +224,17 @@ public class StringTests extends ScriptTestCase {
});
}
public void testComplexCompoundAssignment() {
Map<String, Object> params = new HashMap<>();
Map<String, Object> ctx = new HashMap<>();
ctx.put("_id", "somerandomid");
params.put("ctx", ctx);
assertEquals("somerandomid.somerandomid", exec("ctx._id += '.' + ctx._id", params, false));
assertEquals("somerandomid.somerandomid", exec("String x = 'somerandomid'; x += '.' + x"));
assertEquals("somerandomid.somerandomid", exec("def x = 'somerandomid'; x += '.' + x"));
}
public void testAppendStringIntoMap() {
assertEquals("nullcat", exec("def a = new HashMap(); a.cat += 'cat'"));
}

View File

@ -17,6 +17,10 @@
* under the License.
*/
import org.elasticsearch.gradle.test.ClusterConfiguration
import org.elasticsearch.gradle.test.ClusterFormationTasks
import org.elasticsearch.gradle.test.NodeInfo
esplugin {
description 'Discovery file plugin enables unicast discovery from hosts stored in a file.'
classname 'org.elasticsearch.discovery.file.FileBasedDiscoveryPlugin'
@ -27,3 +31,29 @@ bundlePlugin {
into 'config'
}
}
task setupSeedNodeAndUnicastHostsFile(type: DefaultTask) {
mustRunAfter(precommit)
}
// setup the initial cluster with one node that will serve as the seed node
// for unicast discovery
ClusterConfiguration config = new ClusterConfiguration(project)
config.clusterName = 'discovery-file-test-cluster'
List<NodeInfo> nodes = ClusterFormationTasks.setup(project, setupSeedNodeAndUnicastHostsFile, config)
File srcUnicastHostsFile = file('build/cluster/unicast_hosts.txt')
// write the unicast_hosts.txt file to a temporary location to be used by the second cluster
setupSeedNodeAndUnicastHostsFile.doLast {
// write the unicast_hosts.txt file to a temp file in the build directory
srcUnicastHostsFile.setText(nodes.get(0).transportUri(), 'UTF-8')
}
// second cluster, which will connect to the first via the unicast_hosts.txt file
integTest {
dependsOn(setupSeedNodeAndUnicastHostsFile)
cluster {
clusterName = 'discovery-file-test-cluster'
extraConfigFile 'discovery-file/unicast_hosts.txt', srcUnicastHostsFile
}
finalizedBy ':plugins:discovery-file:setupSeedNodeAndUnicastHostsFile#stop'
}

View File

@ -1,13 +1,13 @@
# Integration tests for file-based discovery
#
"Discovery File loaded":
"Ensure cluster formed successfully with discovery file":
# make sure both nodes joined the cluster
- do:
cluster.health:
wait_for_nodes: 2
# make sure the cluster was formed with the correct name
- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }
- do:
nodes.info: {}
- match: { nodes.$master.plugins.0.name: discovery-file }
- match: { cluster_name: 'discovery-file-test-cluster' } # correct cluster name, we formed the cluster we expected to

View File

@ -18,7 +18,7 @@
},
"wait_if_ongoing": {
"type" : "boolean",
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running."
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is true. If set to false the flush will be skipped iff if another flush operation is already running."
},
"ignore_unavailable": {
"type" : "boolean",

View File

@ -1204,7 +1204,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
*/
protected final FlushResponse flush(String... indices) {
waitForRelocation();
FlushResponse actionGet = client().admin().indices().prepareFlush(indices).setWaitIfOngoing(true).execute().actionGet();
FlushResponse actionGet = client().admin().indices().prepareFlush(indices).execute().actionGet();
for (ShardOperationFailedException failure : actionGet.getShardFailures()) {
assertThat("unexpected flush failure " + failure.reason(), failure.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}

View File

@ -21,14 +21,27 @@ import org.elasticsearch.gradle.precommit.PrecommitTasks
dependencies {
compile 'org.ow2.asm:asm-debug-all:5.0.4' // use asm-debug-all as asm-all is broken
compile "org.apache.logging.log4j:log4j-api:${versions.log4j}"
testCompile "org.elasticsearch.test:framework:${version}"
}
// https://github.com/elastic/elasticsearch/issues/20243
// loggerUsageCheck.enabled = false
loggerUsageCheck.enabled = false
forbiddenApisMain.enabled = true // disabled by parent project
forbiddenApisMain {
signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt')] // does not depend on core, only jdk signatures
}
jarHell.enabled = true // disabled by parent project
thirdPartyAudit.excludes = [
// log4j
'org.osgi.framework.AdaptPermission',
'org.osgi.framework.AdminPermission',
'org.osgi.framework.Bundle',
'org.osgi.framework.BundleActivator',
'org.osgi.framework.BundleContext',
'org.osgi.framework.BundleEvent',
'org.osgi.framework.SynchronousBundleListener',
'org.osgi.framework.wiring.BundleWire',
'org.osgi.framework.wiring.BundleWiring'
]

View File

@ -19,6 +19,10 @@
package org.elasticsearch.test.loggerusage;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.objectweb.asm.AnnotationVisitor;
import org.objectweb.asm.ClassReader;
import org.objectweb.asm.ClassVisitor;
@ -52,9 +56,16 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
public class ESLoggerUsageChecker {
public static final String LOGGER_CLASS = "org.elasticsearch.common.logging.ESLogger";
public static final String THROWABLE_CLASS = "java.lang.Throwable";
public static final List<String> LOGGER_METHODS = Arrays.asList("trace", "debug", "info", "warn", "error");
public static final Type LOGGER_CLASS = Type.getType(Logger.class);
public static final Type THROWABLE_CLASS = Type.getType(Throwable.class);
public static final Type STRING_CLASS = Type.getType(String.class);
public static final Type STRING_ARRAY_CLASS = Type.getType(String[].class);
public static final Type PARAMETERIZED_MESSAGE_CLASS = Type.getType(ParameterizedMessage.class);
public static final Type OBJECT_CLASS = Type.getType(Object.class);
public static final Type OBJECT_ARRAY_CLASS = Type.getType(Object[].class);
public static final Type SUPPLIER_ARRAY_CLASS = Type.getType(Supplier[].class);
public static final Type MARKER_CLASS = Type.getType(Marker.class);
public static final List<String> LOGGER_METHODS = Arrays.asList("trace", "debug", "info", "warn", "error", "fatal");
public static final String IGNORE_CHECKS_ANNOTATION = "org.elasticsearch.common.SuppressLoggerChecks";
@SuppressForbidden(reason = "command line tool")
@ -143,7 +154,7 @@ public class ESLoggerUsageChecker {
simpleClassName = simpleClassName + ".java";
StringBuilder sb = new StringBuilder();
sb.append("Bad usage of ");
sb.append(LOGGER_CLASS).append("#").append(logMethodName);
sb.append(LOGGER_CLASS.getClassName()).append("#").append(logMethodName);
sb.append(": ");
sb.append(errorMessage);
sb.append("\n\tat ");
@ -230,7 +241,7 @@ public class ESLoggerUsageChecker {
} catch (AnalyzerException e) {
throw new RuntimeException("Internal error: failed in analysis step", e);
}
Frame<BasicValue>[] stringFrames = stringPlaceHolderAnalyzer.getFrames();
Frame<BasicValue>[] logMessageFrames = stringPlaceHolderAnalyzer.getFrames();
Frame<BasicValue>[] arraySizeFrames = arraySizeAnalyzer.getFrames();
AbstractInsnNode[] insns = methodNode.instructions.toArray();
int lineNumber = -1;
@ -240,53 +251,141 @@ public class ESLoggerUsageChecker {
LineNumberNode lineNumberNode = (LineNumberNode) insn;
lineNumber = lineNumberNode.line;
}
if (insn.getOpcode() == Opcodes.INVOKEVIRTUAL) {
if (insn.getOpcode() == Opcodes.INVOKEINTERFACE) {
MethodInsnNode methodInsn = (MethodInsnNode) insn;
if (Type.getObjectType(methodInsn.owner).getClassName().equals(LOGGER_CLASS) == false) {
continue;
}
if (LOGGER_METHODS.contains(methodInsn.name) == false) {
continue;
}
BasicValue varArgsSizeObject = getStackValue(arraySizeFrames[i], 0); // last argument
if (varArgsSizeObject instanceof ArraySizeBasicValue == false) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Could not determine size of varargs array"));
continue;
}
ArraySizeBasicValue varArgsSize = (ArraySizeBasicValue) varArgsSizeObject;
Type[] argumentTypes = Type.getArgumentTypes(methodInsn.desc);
BasicValue logMessageLengthObject = getStackValue(stringFrames[i], argumentTypes.length - 1); // first argument
if (logMessageLengthObject instanceof PlaceHolderStringBasicValue == false) {
if (varArgsSize.minValue > 0) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"First argument must be a string constant so that we can statically ensure proper place holder usage"));
continue;
} else {
// don't check logger usage for logger.warn(someObject) as someObject will be fully logged
if (Type.getObjectType(methodInsn.owner).equals(LOGGER_CLASS)) {
if (LOGGER_METHODS.contains(methodInsn.name) == false) {
continue;
}
Type[] argumentTypes = Type.getArgumentTypes(methodInsn.desc);
int markerOffset = 0;
if (argumentTypes[0].equals(MARKER_CLASS)) {
markerOffset = 1;
}
int lengthWithoutMarker = argumentTypes.length - markerOffset;
if (lengthWithoutMarker == 2 &&
argumentTypes[markerOffset + 0].equals(STRING_CLASS) &&
(argumentTypes[markerOffset + 1].equals(OBJECT_ARRAY_CLASS) ||
argumentTypes[markerOffset + 1].equals(SUPPLIER_ARRAY_CLASS))) {
// VARARGS METHOD: debug(Marker?, String, (Object...|Supplier...))
checkArrayArgs(methodNode, logMessageFrames[i], arraySizeFrames[i], lineNumber, methodInsn, markerOffset + 0,
markerOffset + 1);
} else if (lengthWithoutMarker >= 2 &&
argumentTypes[markerOffset + 0].equals(STRING_CLASS) &&
argumentTypes[markerOffset + 1].equals(OBJECT_CLASS)) {
// MULTI-PARAM METHOD: debug(Marker?, String, Object p0, ...)
checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, markerOffset + 0,
lengthWithoutMarker - 1);
} else if ((lengthWithoutMarker == 1 || lengthWithoutMarker == 2) &&
lengthWithoutMarker == 2 ? argumentTypes[markerOffset + 1].equals(THROWABLE_CLASS) : true) {
// all the rest: debug(Marker?, (Message|MessageSupplier|CharSequence|Object|String|Supplier), Throwable?)
checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, markerOffset + 0, 0);
} else {
throw new IllegalStateException("Method invoked on " + LOGGER_CLASS.getClassName() +
" that is not supported by logger usage checker");
}
}
PlaceHolderStringBasicValue logMessageLength = (PlaceHolderStringBasicValue) logMessageLengthObject;
if (logMessageLength.minValue != logMessageLength.maxValue) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Multiple log messages with conflicting number of place holders"));
continue;
}
if (varArgsSize.minValue != varArgsSize.maxValue) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Multiple parameter arrays with conflicting sizes"));
continue;
}
assert logMessageLength.minValue == logMessageLength.maxValue && varArgsSize.minValue == varArgsSize.maxValue;
if (logMessageLength.minValue != varArgsSize.minValue) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Expected " + logMessageLength.minValue + " arguments but got " + varArgsSize.minValue));
continue;
} else if (insn.getOpcode() == Opcodes.INVOKESPECIAL) { // constructor invocation
MethodInsnNode methodInsn = (MethodInsnNode) insn;
if (Type.getObjectType(methodInsn.owner).equals(PARAMETERIZED_MESSAGE_CLASS)) {
Type[] argumentTypes = Type.getArgumentTypes(methodInsn.desc);
if (argumentTypes.length == 2 &&
argumentTypes[0].equals(STRING_CLASS) &&
argumentTypes[1].equals(OBJECT_ARRAY_CLASS)) {
checkArrayArgs(methodNode, logMessageFrames[i], arraySizeFrames[i], lineNumber, methodInsn, 0, 1);
} else if (argumentTypes.length == 2 &&
argumentTypes[0].equals(STRING_CLASS) &&
argumentTypes[1].equals(OBJECT_CLASS)) {
checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, 0, 1);
} else if (argumentTypes.length == 3 &&
argumentTypes[0].equals(STRING_CLASS) &&
argumentTypes[1].equals(OBJECT_CLASS) &&
argumentTypes[2].equals(OBJECT_CLASS)) {
checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, 0, 2);
} else if (argumentTypes.length == 3 &&
argumentTypes[0].equals(STRING_CLASS) &&
argumentTypes[1].equals(OBJECT_ARRAY_CLASS) &&
argumentTypes[2].equals(THROWABLE_CLASS)) {
checkArrayArgs(methodNode, logMessageFrames[i], arraySizeFrames[i], lineNumber, methodInsn, 0, 1);
} else if (argumentTypes.length == 3 &&
argumentTypes[0].equals(STRING_CLASS) &&
argumentTypes[1].equals(STRING_ARRAY_CLASS) &&
argumentTypes[2].equals(THROWABLE_CLASS)) {
checkArrayArgs(methodNode, logMessageFrames[i], arraySizeFrames[i], lineNumber, methodInsn, 0, 1);
} else {
throw new IllegalStateException("Constructor invoked on " + PARAMETERIZED_MESSAGE_CLASS.getClassName() +
" that is not supported by logger usage checker");
}
}
}
}
}
private void checkFixedArityArgs(MethodNode methodNode, Frame<BasicValue> logMessageFrame, int lineNumber,
MethodInsnNode methodInsn, int messageIndex, int positionalArgsLength) {
PlaceHolderStringBasicValue logMessageLength = checkLogMessageConsistency(methodNode, logMessageFrame, lineNumber, methodInsn,
messageIndex, positionalArgsLength);
if (logMessageLength == null) {
return;
}
if (logMessageLength.minValue != positionalArgsLength) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Expected " + logMessageLength.minValue + " arguments but got " + positionalArgsLength));
return;
}
}
private void checkArrayArgs(MethodNode methodNode, Frame<BasicValue> logMessageFrame, Frame<BasicValue> arraySizeFrame,
int lineNumber, MethodInsnNode methodInsn, int messageIndex, int arrayIndex) {
BasicValue arraySizeObject = getStackValue(arraySizeFrame, methodInsn, arrayIndex);
if (arraySizeObject instanceof ArraySizeBasicValue == false) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Could not determine size of array"));
return;
}
ArraySizeBasicValue arraySize = (ArraySizeBasicValue) arraySizeObject;
PlaceHolderStringBasicValue logMessageLength = checkLogMessageConsistency(methodNode, logMessageFrame, lineNumber, methodInsn,
messageIndex, arraySize.minValue);
if (logMessageLength == null) {
return;
}
if (arraySize.minValue != arraySize.maxValue) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Multiple parameter arrays with conflicting sizes"));
return;
}
assert logMessageLength.minValue == logMessageLength.maxValue && arraySize.minValue == arraySize.maxValue;
if (logMessageLength.minValue != arraySize.minValue) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Expected " + logMessageLength.minValue + " arguments but got " + arraySize.minValue));
return;
}
}
private PlaceHolderStringBasicValue checkLogMessageConsistency(MethodNode methodNode, Frame<BasicValue> logMessageFrame,
int lineNumber, MethodInsnNode methodInsn, int messageIndex,
int argsSize) {
BasicValue logMessageLengthObject = getStackValue(logMessageFrame, methodInsn, messageIndex);
if (logMessageLengthObject instanceof PlaceHolderStringBasicValue == false) {
if (argsSize > 0) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"First argument must be a string constant so that we can statically ensure proper place holder usage"));
} else {
// don't check logger usage for logger.warn(someObject)
}
return null;
}
PlaceHolderStringBasicValue logMessageLength = (PlaceHolderStringBasicValue) logMessageLengthObject;
if (logMessageLength.minValue != logMessageLength.maxValue) {
wrongUsageCallback.accept(new WrongLoggerUsage(className, methodNode.name, methodInsn.name, lineNumber,
"Multiple log messages with conflicting number of place holders"));
return null;
}
return logMessageLength;
}
}
private static int calculateNumberOfPlaceHolders(String message) {
@ -300,9 +399,10 @@ public class ESLoggerUsageChecker {
return count;
}
private static BasicValue getStackValue(Frame<BasicValue> f, int index) {
private static BasicValue getStackValue(Frame<BasicValue> f, MethodInsnNode methodInsn, int index) {
int relIndex = Type.getArgumentTypes(methodInsn.desc).length - 1 - index;
int top = f.getStackSize() - 1;
return index <= top ? f.getStack(top - index) : null;
return relIndex <= top ? f.getStack(top - relIndex) : null;
}
private static class IntMinMaxTrackingBasicValue extends BasicValue {

View File

@ -20,31 +20,29 @@
package org.elasticsearch.test.loggerusage;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.MessageSupplier;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.SuppressLoggerChecks;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.loggerusage.ESLoggerUsageChecker.WrongLoggerUsage;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ESLoggerUsageTests extends ESTestCase {
// needed to avoid the test suite from failing for having no tests
public void testSoThatTestsDoNotFail() {
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/20243")
public void testLoggerUsageChecks() throws IOException {
for (Method method : getClass().getMethods()) {
if (method.getDeclaringClass().equals(getClass())) {
@ -52,7 +50,8 @@ public class ESLoggerUsageTests extends ESTestCase {
logger.info("Checking logger usage for method {}", method.getName());
InputStream classInputStream = getClass().getResourceAsStream(getClass().getSimpleName() + ".class");
List<WrongLoggerUsage> errors = new ArrayList<>();
ESLoggerUsageChecker.check(errors::add, classInputStream, Predicate.isEqual(method.getName()));
ESLoggerUsageChecker.check(errors::add, classInputStream,
m -> m.equals(method.getName()) || m.startsWith("lambda$" + method.getName()));
if (method.getName().startsWith("checkFail")) {
assertFalse("Expected " + method.getName() + " to have wrong Logger usage", errors.isEmpty());
} else {
@ -65,27 +64,57 @@ public class ESLoggerUsageTests extends ESTestCase {
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/20243")
public void testLoggerUsageCheckerCompatibilityWithESLogger() throws NoSuchMethodException {
assertThat(ESLoggerUsageChecker.LOGGER_CLASS, equalTo(Logger.class.getName()));
assertThat(ESLoggerUsageChecker.THROWABLE_CLASS, equalTo(Throwable.class.getName()));
int varargsMethodCount = 0;
public void testLoggerUsageCheckerCompatibilityWithLog4j2Logger() throws NoSuchMethodException {
for (Method method : Logger.class.getMethods()) {
if (method.isVarArgs()) {
// check that logger usage checks all varargs methods
assertThat(ESLoggerUsageChecker.LOGGER_METHODS, hasItem(method.getName()));
varargsMethodCount++;
if (ESLoggerUsageChecker.LOGGER_METHODS.contains(method.getName())) {
assertThat(method.getParameterTypes().length, greaterThanOrEqualTo(1));
int markerOffset = method.getParameterTypes()[0].equals(Marker.class) ? 1 : 0;
int paramLength = method.getParameterTypes().length - markerOffset;
if (method.isVarArgs()) {
assertEquals(2, paramLength);
assertEquals(String.class, method.getParameterTypes()[markerOffset]);
assertThat(method.getParameterTypes()[markerOffset + 1], Matchers.<Class<?>>isOneOf(Object[].class, Supplier[].class));
} else {
assertThat(method.getParameterTypes()[markerOffset], Matchers.<Class<?>>isOneOf(Message.class, MessageSupplier.class,
CharSequence.class, Object.class, String.class, Supplier.class));
if (paramLength == 2) {
assertThat(method.getParameterTypes()[markerOffset + 1], Matchers.<Class<?>>isOneOf(Throwable.class, Object.class));
if (method.getParameterTypes()[markerOffset + 1].equals(Object.class)) {
assertEquals(String.class, method.getParameterTypes()[markerOffset]);
}
}
if (paramLength > 2) {
assertEquals(String.class, method.getParameterTypes()[markerOffset]);
assertThat(paramLength, lessThanOrEqualTo(11));
for (int i = 1; i < paramLength; i++) {
assertEquals(Object.class, method.getParameterTypes()[markerOffset + i]);
}
}
}
}
}
// currently we have two overloaded methods for each of debug, info, ...
// if that changes, we might want to have another look at the usage checker
assertThat(varargsMethodCount, equalTo(ESLoggerUsageChecker.LOGGER_METHODS.size() * 2));
// check that signature is same as we expect in the usage checker
for (String methodName : ESLoggerUsageChecker.LOGGER_METHODS) {
assertThat(Logger.class.getMethod(methodName, String.class, Object[].class), notNullValue());
assertThat(Logger.class.getMethod(methodName, String.class, Throwable.class, Object[].class), notNullValue());
assertEquals(48, Stream.of(Logger.class.getMethods()).filter(m -> methodName.equals(m.getName())).count());
}
for (Constructor<?> constructor : ParameterizedMessage.class.getConstructors()) {
assertThat(constructor.getParameterTypes().length, greaterThanOrEqualTo(2));
assertEquals(String.class, constructor.getParameterTypes()[0]);
assertThat(constructor.getParameterTypes()[1], Matchers.<Class<?>>isOneOf(String[].class, Object[].class, Object.class));
if (constructor.getParameterTypes().length > 2) {
assertEquals(3, constructor.getParameterTypes().length);
if (constructor.getParameterTypes()[1].equals(Object.class)) {
assertEquals(Object.class, constructor.getParameterTypes()[2]);
} else {
assertEquals(Throwable.class, constructor.getParameterTypes()[2]);
}
}
}
assertEquals(5, ParameterizedMessage.class.getConstructors().length);
}
public void checkNumberOfArguments1() {
@ -110,7 +139,6 @@ public class ESLoggerUsageTests extends ESTestCase {
}
public void checkNumberOfArguments3() {
// long argument list (> 5), emits different bytecode
logger.info("Hello {}, {}, {}, {}, {}, {}, {}", "world", 2, "third argument", 4, 5, 6, new String("last arg"));
}
@ -118,6 +146,30 @@ public class ESLoggerUsageTests extends ESTestCase {
logger.info("Hello {}, {}, {}, {}, {}, {}, {}", "world", 2, "third argument", 4, 5, 6, 7, new String("last arg"));
}
public void checkNumberOfArgumentsParameterizedMessage1() {
logger.info(new ParameterizedMessage("Hello {}, {}, {}", "world", 2, "third argument"));
}
public void checkFailNumberOfArgumentsParameterizedMessage1() {
logger.info(new ParameterizedMessage("Hello {}, {}", "world", 2, "third argument"));
}
public void checkNumberOfArgumentsParameterizedMessage2() {
logger.info(new ParameterizedMessage("Hello {}, {}", "world", 2));
}
public void checkFailNumberOfArgumentsParameterizedMessage2() {
logger.info(new ParameterizedMessage("Hello {}, {}, {}", "world", 2));
}
public void checkNumberOfArgumentsParameterizedMessage3() {
logger.info((Supplier<?>) () -> new ParameterizedMessage("Hello {}, {}, {}", "world", 2, "third argument"));
}
public void checkFailNumberOfArgumentsParameterizedMessage3() {
logger.info((Supplier<?>) () -> new ParameterizedMessage("Hello {}, {}", "world", 2, "third argument"));
}
public void checkOrderOfExceptionArgument() {
logger.info("Hello", new Exception());
}