Merge branch 'master' into pr/s3-path-style-access

This commit is contained in:
David Pilato 2016-07-19 15:32:41 +02:00
commit 32a5e16c7d
31 changed files with 785 additions and 204 deletions

View File

@ -120,6 +120,7 @@ public abstract class StreamInput extends InputStream {
* only if you must differentiate null from empty. Use {@link StreamInput#readBytesReference()} and
* {@link StreamOutput#writeBytesReference(BytesReference)} if you do not.
*/
@Nullable
public BytesReference readOptionalBytesReference() throws IOException {
int length = readVInt() - 1;
if (length < 0) {
@ -275,6 +276,14 @@ public abstract class StreamInput extends InputStream {
return BitUtil.zigZagDecode(accumulator | (currentByte << i));
}
@Nullable
public Long readOptionalLong() throws IOException {
if (readBoolean()) {
return readLong();
}
return null;
}
@Nullable
public Text readOptionalText() throws IOException {
int length = readInt();
@ -355,6 +364,7 @@ public abstract class StreamInput extends InputStream {
return Double.longBitsToDouble(readLong());
}
@Nullable
public final Double readOptionalDouble() throws IOException {
if (readBoolean()) {
return readDouble();
@ -402,6 +412,7 @@ public abstract class StreamInput extends InputStream {
return ret;
}
@Nullable
public String[] readOptionalStringArray() throws IOException {
if (readBoolean()) {
return readStringArray();
@ -635,6 +646,7 @@ public abstract class StreamInput extends InputStream {
/**
* Serializes a potential null value.
*/
@Nullable
public <T extends Streamable> T readOptionalStreamable(Supplier<T> supplier) throws IOException {
if (readBoolean()) {
T streamable = supplier.get();
@ -645,6 +657,7 @@ public abstract class StreamInput extends InputStream {
}
}
@Nullable
public <T extends Writeable> T readOptionalWriteable(Writeable.Reader<T> reader) throws IOException {
if (readBoolean()) {
T t = reader.read(this);
@ -769,6 +782,7 @@ public abstract class StreamInput extends InputStream {
/**
* Reads an optional {@link NamedWriteable}.
*/
@Nullable
public <C extends NamedWriteable> C readOptionalNamedWriteable(Class<C> categoryClass) throws IOException {
if (readBoolean()) {
return readNamedWriteable(categoryClass);

View File

@ -237,6 +237,15 @@ public abstract class StreamOutput extends OutputStream {
writeByte((byte) (value & 0x7F));
}
public void writeOptionalLong(@Nullable Long l) throws IOException {
if (l == null) {
writeBoolean(false);
} else {
writeBoolean(true);
writeLong(l);
}
}
public void writeOptionalString(@Nullable String str) throws IOException {
if (str == null) {
writeBoolean(false);
@ -314,7 +323,7 @@ public abstract class StreamOutput extends OutputStream {
writeLong(Double.doubleToLongBits(v));
}
public void writeOptionalDouble(Double v) throws IOException {
public void writeOptionalDouble(@Nullable Double v) throws IOException {
if (v == null) {
writeBoolean(false);
} else {
@ -798,7 +807,7 @@ public abstract class StreamOutput extends OutputStream {
/**
* Write an optional {@linkplain DateTimeZone} to the stream.
*/
public void writeOptionalTimeZone(DateTimeZone timeZone) throws IOException {
public void writeOptionalTimeZone(@Nullable DateTimeZone timeZone) throws IOException {
if (timeZone == null) {
writeBoolean(false);
} else {

View File

@ -416,6 +416,7 @@ public final class ObjectParser<Value, Context extends ParseFieldMatcherSupplier
FLOAT_OR_NULL(VALUE_NUMBER, VALUE_STRING, VALUE_NULL),
DOUBLE(VALUE_NUMBER, VALUE_STRING),
LONG(VALUE_NUMBER, VALUE_STRING),
LONG_OR_NULL(VALUE_NUMBER, VALUE_STRING, VALUE_NULL),
INT(VALUE_NUMBER, VALUE_STRING),
BOOLEAN(VALUE_BOOLEAN),
STRING_ARRAY(START_ARRAY, VALUE_STRING),

View File

@ -382,8 +382,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
if (recoveryState.getSourceNode().equals(sourceNode) == false) {
if (recoveryTargetService.cancelRecoveriesForShard(shardId, "recovery source node changed")) {
// getting here means that the shard was still recovering
logger.debug("{} removing shard (recovery source changed), current [{}], global [{}])",
shardId, currentRoutingEntry, newShardRouting);
logger.debug("{} removing shard (recovery source changed), current [{}], global [{}], shard [{}])",
shardId, recoveryState.getSourceNode(), sourceNode, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (recovery source node changed)");
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
@ -31,7 +32,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -48,10 +48,12 @@ public class RecoveriesCollection {
private final ESLogger logger;
private final ThreadPool threadPool;
private final Callback<Long> ensureClusterStateVersionCallback;
public RecoveriesCollection(ESLogger logger, ThreadPool threadPool) {
public RecoveriesCollection(ESLogger logger, ThreadPool threadPool, Callback<Long> ensureClusterStateVersionCallback) {
this.logger = logger;
this.threadPool = threadPool;
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
}
/**
@ -61,7 +63,7 @@ public class RecoveriesCollection {
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
RecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener);
RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
RecoveryTarget existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
@ -175,7 +177,6 @@ public class RecoveriesCollection {
return cancelled;
}
/**
* a reference to {@link RecoveryTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
@ -44,6 +45,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
@ -101,13 +103,7 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]");
}
ShardRouting targetShardRouting = null;
for (ShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(request.shardId())) {
targetShardRouting = shardRouting;
break;
}
}
ShardRouting targetShardRouting = node.getByShardId(request.shardId());
if (targetShardRouting == null) {
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode());
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
@ -118,17 +114,8 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
}
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
final RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(),
recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, logger);
} else {
handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt(), logger);
}
ongoingRecoveries.add(shard, handler);
try {
return handler.recoverToTarget();
} finally {
@ -144,38 +131,35 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
}
}
private static final class OngoingRecoveries {
private final Map<IndexShard, Set<RecoverySourceHandler>> ongoingRecoveries = new HashMap<>();
private final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
synchronized void add(IndexShard shard, RecoverySourceHandler handler) {
Set<RecoverySourceHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers == null) {
shardRecoveryHandlers = new HashSet<>();
ongoingRecoveries.put(shard, shardRecoveryHandlers);
}
assert shardRecoveryHandlers.contains(handler) == false : "Handler was already registered [" + handler + "]";
shardRecoveryHandlers.add(handler);
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
shard.recoveryStats().incCurrentAsSource();
return handler;
}
synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
final Set<RecoverySourceHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
assert shardRecoveryHandlers != null : "Shard was not registered [" + shard + "]";
boolean remove = shardRecoveryHandlers.remove(handler);
final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard);
assert shardRecoveryContext != null : "Shard was not registered [" + shard + "]";
boolean remove = shardRecoveryContext.recoveryHandlers.remove(handler);
assert remove : "Handler was not registered [" + handler + "]";
if (remove) {
shard.recoveryStats().decCurrentAsSource();
}
if (shardRecoveryHandlers.isEmpty()) {
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
ongoingRecoveries.remove(shard);
assert shardRecoveryContext.onNewRecoveryException == null;
}
}
synchronized void cancel(IndexShard shard, String reason) {
final Set<RecoverySourceHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers != null) {
final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard);
if (shardRecoveryContext != null) {
final List<Exception> failures = new ArrayList<>();
for (RecoverySourceHandler handlers : shardRecoveryHandlers) {
for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers) {
try {
handlers.cancel(reason);
} catch (Exception ex) {
@ -187,6 +171,60 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
ExceptionsHelper.maybeThrowRuntimeAndSuppress(failures);
}
}
private final class ShardRecoveryContext {
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();
@Nullable
private DelayRecoveryException onNewRecoveryException;
/**
* Adds recovery source handler if recoveries are not delayed from starting (see also {@link #delayNewRecoveries}.
* Throws {@link DelayRecoveryException} if new recoveries are delayed from starting.
*/
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
if (onNewRecoveryException != null) {
throw onNewRecoveryException;
}
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
recoveryHandlers.add(handler);
return handler;
}
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(),
recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
Supplier<Long> currentClusterStateVersionSupplier = () -> clusterService.state().getVersion();
if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
this::delayNewRecoveries, logger);
} else {
handler = new RecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger);
}
return handler;
}
/**
* Makes new recoveries throw a {@link DelayRecoveryException} with the provided message.
*
* Throws {@link IllegalStateException} if new recoveries are already being delayed.
*/
synchronized Releasable delayNewRecoveries(String exceptionMessage) throws IllegalStateException {
if (onNewRecoveryException != null) {
throw new IllegalStateException("already delaying recoveries");
}
onNewRecoveryException = new DelayRecoveryException(exceptionMessage);
return this::unblockNewRecoveries;
}
private synchronized void unblockNewRecoveries() {
onNewRecoveryException = null;
}
}
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -41,6 +42,7 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -53,6 +55,7 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
/**
@ -75,6 +78,8 @@ public class RecoverySourceHandler {
private final int shardId;
// Request containing source and target node information
private final StartRecoveryRequest request;
private final Supplier<Long> currentClusterStateVersionSupplier;
private final Function<String, Releasable> delayNewRecoveries;
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
@ -98,11 +103,15 @@ public class RecoverySourceHandler {
public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
final StartRecoveryRequest request,
final Supplier<Long> currentClusterStateVersionSupplier,
Function<String, Releasable> delayNewRecoveries,
final int fileChunkSizeInBytes,
final ESLogger logger) {
this.shard = shard;
this.recoveryTarget = recoveryTarget;
this.request = request;
this.currentClusterStateVersionSupplier = currentClusterStateVersionSupplier;
this.delayNewRecoveries = delayNewRecoveries;
this.logger = logger;
this.indexName = this.request.shardId().getIndex().getName();
this.shardId = this.request.shardId().id();
@ -136,6 +145,22 @@ public class RecoverySourceHandler {
}
}
// engine was just started at the end of phase 1
if (shard.state() == IndexShardState.RELOCATED) {
/**
* The primary shard has been relocated while we copied files. This means that we can't guarantee any more that all
* operations that were replicated during the file copy (when the target engine was not yet opened) will be present in the
* local translog and thus will be resent on phase 2. The reason is that an operation replicated by the target primary is
* sent to the recovery target and the local shard (old primary) concurrently, meaning it may have arrived at the recovery
* target before we opened the engine and is still in-flight on the local shard.
*
* Checking the relocated status here, after we opened the engine on the target, is safe because primary relocation waits
* for all ongoing operations to complete and be fully replicated. Therefore all future operation by the new primary are
* guaranteed to reach the target shard when it's engine is open.
*/
throw new IndexShardRelocatedException(request.shardId());
}
logger.trace("{} snapshot translog for recovery. current size is [{}]", shard.shardId(), translogView.totalOperations());
try {
phase2(translogView.snapshot());
@ -362,12 +387,18 @@ public class RecoverySourceHandler {
cancellableThreads.execute(recoveryTarget::finalizeRecovery);
if (isPrimaryRelocation()) {
logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode());
try {
// in case of primary relocation we have to ensure that the cluster state on the primary relocation target has all
// replica shards that have recovered or are still recovering from the current primary, otherwise replication actions
// will not be send to these replicas. To accomplish this, first block new recoveries, then take version of latest cluster
// state. This means that no new recovery can be completed based on information of a newer cluster state than the current one.
try (Releasable ignored = delayNewRecoveries.apply("primary relocation hand-off in progress or completed for " + shardId)) {
final long currentClusterStateVersion = currentClusterStateVersionSupplier.get();
logger.trace("[{}][{}] waiting on {} to have cluster state with version [{}]", indexName, shardId, request.targetNode(),
currentClusterStateVersion);
cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion));
logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
} catch (Exception e) {
logger.debug("[{}][{}] completing relocation hand-off to {} failed", e, indexName, shardId, request.targetNode());
throw e;
}
/**
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -56,10 +57,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
* this class are created through {@link RecoveriesCollection}.
*/
public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler {
private final ESLogger logger;
@ -75,6 +75,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final String tempFilePrefix;
private final Store store;
private final RecoveryTargetService.RecoveryListener listener;
private final Callback<Long> ensureClusterStateVersionCallback;
private final AtomicBoolean finished = new AtomicBoolean();
@ -87,15 +88,26 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor
this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId);
this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId,
copyFrom.ensureClusterStateVersionCallback);
}
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) {
this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet());
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener,
Callback<Long> ensureClusterStateVersionCallback) {
this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet(), ensureClusterStateVersionCallback);
}
/**
* creates a new recovery target object that represents a recovery to the provided indexShard
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed / failed
* @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided
* version. Necessary for primary relocation so that new primary knows about all other ongoing
* replica recoveries when replicating documents (see {@link RecoverySourceHandler}).
*/
private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener,
CancellableThreads cancellableThreads, long recoveryId) {
CancellableThreads cancellableThreads, long recoveryId, Callback<Long> ensureClusterStateVersionCallback) {
super("recovery_status");
this.cancellableThreads = cancellableThreads;
this.recoveryId = recoveryId;
@ -106,6 +118,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + ".";
this.store = indexShard.store();
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
@ -321,6 +334,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
indexShard().finalizeRecovery();
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
ensureClusterStateVersionCallback.handle(clusterStateVersion);
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer
.BatchOperationException {

View File

@ -43,6 +43,11 @@ public interface RecoveryTargetHandler {
**/
void finalizeRecovery();
/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
*/
void ensureClusterStateVersion(long clusterStateVersion);
/**
* Index a set of translog operations on the target
* @param operations operations to index

View File

@ -24,6 +24,7 @@ import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -42,11 +43,11 @@ import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
@ -76,6 +77,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
}
private final ThreadPool threadPool;
@ -95,7 +97,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool, this::waitForClusterState);
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
FilesInfoRequestHandler());
@ -109,6 +111,8 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
}
@Override
@ -301,6 +305,18 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
}
}
class WaitForClusterStateRequestHandler implements TransportRequestHandler<RecoveryWaitForClusterStateRequest> {
@Override
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.status().ensureClusterStateVersion(request.clusterStateVersion());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
@ -362,6 +378,52 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
}
}
private void waitForClusterState(long clusterStateVersion) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, TimeValue.timeValueMinutes(5), logger,
threadPool.getThreadContext());
final ClusterState clusterState = observer.observedState();
if (clusterState.getVersion() >= clusterStateVersion) {
logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion,
clusterState.getVersion());
return;
} else {
logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion());
final PlainActionFuture<Void> future = new PlainActionFuture<>();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
future.onResponse(null);
}
@Override
public void onClusterServiceClose() {
future.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
future.onFailure(new IllegalStateException("cluster state never updated to version " + clusterStateVersion));
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
return newState.getVersion() >= clusterStateVersion;
}
});
try {
future.get();
logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion,
observer.observedState().getVersion());
} catch (Exception e) {
logger.debug("failed waiting for cluster state with version {} (current: {})", e, clusterStateVersion,
observer.observedState());
throw ExceptionsHelper.convertToRuntime(e);
}
}
}
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
@Override

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
public class RecoveryWaitForClusterStateRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private long clusterStateVersion;
public RecoveryWaitForClusterStateRequest() {
}
RecoveryWaitForClusterStateRequest(long recoveryId, ShardId shardId, long clusterStateVersion) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.clusterStateVersion = clusterStateVersion;
}
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() {
return shardId;
}
public long clusterStateVersion() {
return clusterStateVersion;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
clusterStateVersion = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVLong(clusterStateVersion);
}
}

View File

@ -89,6 +89,14 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.WAIT_CLUSTERSTATE,
new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(

View File

@ -19,11 +19,14 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked
@ -34,9 +37,10 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, ESLogger
logger) {
super(shard, recoveryTarget, request, -1, logger);
public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
Supplier<Long> currentClusterStateVersionSupplier,
Function<String, Releasable> delayNewRecoveries, ESLogger logger) {
super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, logger);
this.shard = shard;
this.request = request;
}

View File

@ -103,9 +103,8 @@ public abstract class AbstractHistogramAggregatorFactory<AF extends AbstractHist
// code so we won't need to do that
ExtendedBounds roundedBounds = null;
if (extendedBounds != null) {
// we need to process & validate here using the parser
extendedBounds.processAndValidate(name, context.searchContext(), config.format());
roundedBounds = extendedBounds.round(rounding);
// parse any string bounds to longs and round them
roundedBounds = extendedBounds.parseAndValidate(name, context.searchContext(), config.format()).round(rounding);
}
return new HistogramAggregator(name, factories, rounding, order, keyed, minDocCount, roundedBounds, valuesSource,
config.format(), histogramFactory, context, parent, pipelineAggregators, metaData);

View File

@ -20,15 +20,17 @@
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.xcontent.AbstractObjectParser.NoContextParser;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.internal.SearchContext;
@ -36,26 +38,91 @@ import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Objects;
public class ExtendedBounds implements ToXContent, Writeable {
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class ExtendedBounds implements ToXContent, Writeable {
static final ParseField EXTENDED_BOUNDS_FIELD = new ParseField("extended_bounds");
static final ParseField MIN_FIELD = new ParseField("min");
static final ParseField MAX_FIELD = new ParseField("max");
Long min;
Long max;
String minAsStr;
String maxAsStr;
ExtendedBounds() {} //for parsing
public ExtendedBounds(Long min, Long max) {
this.min = min;
this.max = max;
public static final ConstructingObjectParser<ExtendedBounds, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
"extended_bounds", a -> {
assert a.length == 2;
Long min = null;
Long max = null;
String minAsStr = null;
String maxAsStr = null;
if (a[0] == null) {
// nothing to do with it
} else if (a[0] instanceof Long) {
min = (Long) a[0];
} else if (a[0] instanceof String) {
minAsStr = (String) a[0];
} else {
throw new IllegalArgumentException("Unknown field type [" + a[0].getClass() + "]");
}
if (a[1] == null) {
// nothing to do with it
} else if (a[1] instanceof Long) {
max = (Long) a[1];
} else if (a[1] instanceof String) {
maxAsStr = (String) a[1];
} else {
throw new IllegalArgumentException("Unknown field type [" + a[1].getClass() + "]");
}
return new ExtendedBounds(min, max, minAsStr, maxAsStr);
});
static {
NoContextParser<Object> longOrString = p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return p.longValue(false);
}
if (p.currentToken() == Token.VALUE_STRING) {
return p.text();
}
if (p.currentToken() == Token.VALUE_NULL) {
return null;
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
};
PARSER.declareField(optionalConstructorArg(), longOrString, MIN_FIELD, ValueType.LONG_OR_NULL);
PARSER.declareField(optionalConstructorArg(), longOrString, MAX_FIELD, ValueType.LONG_OR_NULL);
}
/**
* Parsed min value. If this is null and {@linkplain #minAsStr} isn't then this must be parsed from {@linkplain #minAsStr}. If this is
* null and {@linkplain #minAsStr} is also null then there is no lower bound.
*/
private final Long min;
/**
* Parsed min value. If this is null and {@linkplain #maxAsStr} isn't then this must be parsed from {@linkplain #maxAsStr}. If this is
* null and {@linkplain #maxAsStr} is also null then there is no lower bound.
*/
private final Long max;
private final String minAsStr;
private final String maxAsStr;
/**
* Construct with parsed bounds.
*/
public ExtendedBounds(Long min, Long max) {
this(min, max, null, null);
}
/**
* Construct with unparsed bounds.
*/
public ExtendedBounds(String minAsStr, String maxAsStr) {
this(null, null, minAsStr, maxAsStr);
}
/**
* Construct with all possible information.
*/
private ExtendedBounds(Long min, Long max, String minAsStr, String maxAsStr) {
this.min = min;
this.max = max;
this.minAsStr = minAsStr;
this.maxAsStr = maxAsStr;
}
@ -64,85 +131,45 @@ public class ExtendedBounds implements ToXContent, Writeable {
* Read from a stream.
*/
public ExtendedBounds(StreamInput in) throws IOException {
if (in.readBoolean()) {
min = in.readLong();
}
if (in.readBoolean()) {
max = in.readLong();
}
min = in.readOptionalLong();
max = in.readOptionalLong();
minAsStr = in.readOptionalString();
maxAsStr = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (min != null) {
out.writeBoolean(true);
out.writeLong(min);
} else {
out.writeBoolean(false);
}
if (max != null) {
out.writeBoolean(true);
out.writeLong(max);
} else {
out.writeBoolean(false);
}
out.writeOptionalLong(min);
out.writeOptionalLong(max);
out.writeOptionalString(minAsStr);
out.writeOptionalString(maxAsStr);
}
void processAndValidate(String aggName, SearchContext context, DocValueFormat format) {
/**
* Parse the bounds and perform any delayed validation. Returns the result of the parsing.
*/
ExtendedBounds parseAndValidate(String aggName, SearchContext context, DocValueFormat format) {
Long min = this.min;
Long max = this.max;
assert format != null;
if (minAsStr != null) {
min = format.parseLong(minAsStr, false, context.nowCallable());
min = format.parseLong(minAsStr, false, context::nowInMillis);
}
if (maxAsStr != null) {
// TODO: Should we rather pass roundUp=true?
max = format.parseLong(maxAsStr, false, context.nowCallable());
max = format.parseLong(maxAsStr, false, context::nowInMillis);
}
if (min != null && max != null && min.compareTo(max) > 0) {
throw new SearchParseException(context, "[extended_bounds.min][" + min + "] cannot be greater than " +
"[extended_bounds.max][" + max + "] for histogram aggregation [" + aggName + "]", null);
}
return new ExtendedBounds(min, max, minAsStr, maxAsStr);
}
ExtendedBounds round(Rounding rounding) {
return new ExtendedBounds(min != null ? rounding.round(min) : null, max != null ? rounding.round(max) : null);
}
public static ExtendedBounds fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher, String aggregationName)
throws IOException {
XContentParser.Token token = null;
String currentFieldName = null;
ExtendedBounds extendedBounds = new ExtendedBounds();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if ("min".equals(currentFieldName)) {
extendedBounds.minAsStr = parser.text();
} else if ("max".equals(currentFieldName)) {
extendedBounds.maxAsStr = parser.text();
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown extended_bounds key for a " + token
+ " in aggregation [" + aggregationName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (parseFieldMatcher.match(currentFieldName, MIN_FIELD)) {
extendedBounds.min = parser.longValue(true);
} else if (parseFieldMatcher.match(currentFieldName, MAX_FIELD)) {
extendedBounds.max = parser.longValue(true);
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown extended_bounds key for a " + token
+ " in aggregation [" + aggregationName + "]: [" + currentFieldName + "].");
}
}
}
return extendedBounds;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(EXTENDED_BOUNDS_FIELD.getPreferredName());
@ -162,7 +189,7 @@ public class ExtendedBounds implements ToXContent, Writeable {
@Override
public int hashCode() {
return Objects.hash(min, max);
return Objects.hash(min, max, minAsStr, maxAsStr);
}
@Override
@ -175,6 +202,43 @@ public class ExtendedBounds implements ToXContent, Writeable {
}
ExtendedBounds other = (ExtendedBounds) obj;
return Objects.equals(min, other.min)
&& Objects.equals(min, other.min);
&& Objects.equals(max, other.max)
&& Objects.equals(minAsStr, other.minAsStr)
&& Objects.equals(maxAsStr, other.maxAsStr);
}
public Long getMin() {
return min;
}
public Long getMax() {
return max;
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();
if (min != null) {
b.append(min);
if (minAsStr != null) {
b.append('(').append(minAsStr).append(')');
}
} else {
if (minAsStr != null) {
b.append(minAsStr);
}
}
b.append("--");
if (max != null) {
b.append(min);
if (maxAsStr != null) {
b.append('(').append(maxAsStr).append(')');
}
} else {
if (maxAsStr != null) {
b.append(maxAsStr);
}
}
return b.toString();
}
}

View File

@ -127,8 +127,11 @@ public class HistogramParser extends NumericValuesSourceParser {
otherOptions.put(HistogramAggregator.ORDER_FIELD, order);
return true;
} else if (parseFieldMatcher.match(currentFieldName, ExtendedBounds.EXTENDED_BOUNDS_FIELD)) {
ExtendedBounds extendedBounds = ExtendedBounds.fromXContent(parser, parseFieldMatcher, aggregationName);
otherOptions.put(ExtendedBounds.EXTENDED_BOUNDS_FIELD, extendedBounds);
try {
otherOptions.put(ExtendedBounds.EXTENDED_BOUNDS_FIELD, ExtendedBounds.PARSER.apply(parser, () -> parseFieldMatcher));
} catch (Exception e) {
throw new ParsingException(parser.getTokenLocation(), "Error parsing [{}]", e, aggregationName);
}
return true;
} else {
return false;

View File

@ -411,9 +411,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
if (bounds != null) {
B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
if (firstBucket == null) {
if (bounds.min != null && bounds.max != null) {
long key = bounds.min;
long max = bounds.max;
if (bounds.getMin() != null && bounds.getMax() != null) {
long key = bounds.getMin();
long max = bounds.getMax();
while (key <= max) {
iter.add(getFactory().createBucket(key, 0,
reducedEmptySubAggs,
@ -422,8 +422,8 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
}
} else {
if (bounds.min != null) {
long key = bounds.min;
if (bounds.getMin() != null) {
long key = bounds.getMin();
if (key < firstBucket.key) {
while (key < firstBucket.key) {
iter.add(getFactory().createBucket(key, 0,
@ -454,9 +454,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) {
if (bounds != null && lastBucket != null && bounds.getMax() != null && bounds.getMax() > lastBucket.key) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
long max = bounds.max;
long max = bounds.getMax();
while (key <= max) {
iter.add(getFactory().createBucket(key, 0,
reducedEmptySubAggs, keyed,

View File

@ -119,10 +119,10 @@ public class RangeAggregator extends BucketsAggregator {
Double from = this.from;
Double to = this.to;
if (fromAsStr != null) {
from = parser.parseDouble(fromAsStr, false, context.nowCallable());
from = parser.parseDouble(fromAsStr, false, context::nowInMillis);
}
if (toAsStr != null) {
to = parser.parseDouble(toAsStr, false, context.nowCallable());
to = parser.parseDouble(toAsStr, false, context::nowInMillis);
}
return new Range(key, from, fromAsStr, to, toAsStr);
}

View File

@ -97,7 +97,7 @@ public class AggregationContext {
} else {
if (config.fieldContext() != null && config.fieldContext().fieldType() != null) {
missing = config.fieldContext().fieldType().docValueFormat(null, DateTimeZone.UTC)
.parseDouble(config.missing().toString(), false, context.nowCallable());
.parseDouble(config.missing().toString(), false, context::nowInMillis);
} else {
missing = Double.parseDouble(config.missing().toString());
}

View File

@ -143,15 +143,6 @@ public abstract class SearchContext implements Releasable {
return nowInMillisImpl();
}
public final Callable<Long> nowCallable() {
return new Callable<Long>() {
@Override
public Long call() throws Exception {
return nowInMillis();
}
};
};
public final boolean nowInMillisUsed() {
return nowInMillisUsed;
}

View File

@ -266,6 +266,7 @@ public class BytesStreamsTests extends ESTestCase {
out.writeVInt(2);
out.writeLong(-3);
out.writeVLong(4);
out.writeOptionalLong(11234234L);
out.writeFloat(1.1f);
out.writeDouble(2.2);
int[] intArray = {1, 2, 3};
@ -299,8 +300,9 @@ public class BytesStreamsTests extends ESTestCase {
assertThat(in.readShort(), equalTo((short)-1));
assertThat(in.readInt(), equalTo(-1));
assertThat(in.readVInt(), equalTo(2));
assertThat(in.readLong(), equalTo((long)-3));
assertThat(in.readVLong(), equalTo((long)4));
assertThat(in.readLong(), equalTo(-3L));
assertThat(in.readVLong(), equalTo(4L));
assertThat(in.readOptionalLong(), equalTo(11234234L));
assertThat((double)in.readFloat(), closeTo(1.1, 0.0001));
assertThat(in.readDouble(), closeTo(2.2, 0.0001));
assertThat(in.readGenericValue(), equalTo((Object) intArray));

View File

@ -139,7 +139,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
new BiFunction<IndexShard, DiscoveryNode, RecoveryTarget>() {
@Override
public RecoveryTarget apply(IndexShard indexShard, DiscoveryNode node) {
return new RecoveryTarget(indexShard, node, recoveryListener) {
return new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) {
@Override
public void renameAllTempFiles() throws IOException {
super.renameAllTempFiles();
@ -274,7 +274,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
primary.recoverFromStore();
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
for (IndexShard replicaShard : replicas) {
recoverReplica(replicaShard, (replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener));
recoverReplica(replicaShard,
(replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener, version -> {}));
}
}
@ -302,8 +303,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode,
replica.store().getMetadataOrEmpty(), RecoveryState.Type.REPLICA, 0);
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, (int) ByteSizeUnit.MB.toKB(1),
logger);
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {},
(int) ByteSizeUnit.MB.toKB(1), logger);
recovery.recoverToTarget();
recoveryTarget.markAsDone();
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));

View File

@ -66,7 +66,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard,
DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, ESLogger logger) {
super(shard, sourceNode, listener);
super(shard, sourceNode, listener, version -> {});
this.recoveryBlocked = recoveryBlocked;
this.releaseRecovery = releaseRecovery;
this.stageToBlock = stageToBlock;

View File

@ -24,6 +24,7 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.store.BaseDirectoryWrapper;
@ -35,15 +36,20 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
@ -55,9 +61,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RecoverySourceHandlerTests extends ESTestCase {
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build());
@ -73,8 +85,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
null, RecoveryState.Type.STORE, randomLong());
Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(),
logger);
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger);
Directory dir = store.directory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
int numDocs = randomIntBetween(10, 100);
@ -125,7 +137,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) {
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
@ -188,7 +201,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) {
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
@ -237,6 +251,99 @@ public class RecoverySourceHandlerTests extends ESTestCase {
IOUtils.close(store, targetStore);
}
public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Completed() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
null, RecoveryState.Type.REPLICA, randomLong());
IndexShard shard = mock(IndexShard.class);
Translog.View translogView = mock(Translog.View.class);
when(shard.acquireTranslogView()).thenReturn(translogView);
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
AtomicBoolean phase1Called = new AtomicBoolean();
AtomicBoolean phase2Called = new AtomicBoolean();
RecoverySourceHandler handler = new RecoverySourceHandler(shard, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
phase1Called.set(true);
}
@Override
public void phase2(Translog.Snapshot snapshot) {
phase2Called.set(true);
}
};
expectThrows(IndexShardRelocatedException.class, () -> handler.recoverToTarget());
assertTrue(phase1Called.get());
assertFalse(phase2Called.get());
}
public void testWaitForClusterStateOnPrimaryRelocation() throws IOException, InterruptedException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
null, RecoveryState.Type.PRIMARY_RELOCATION, randomLong());
AtomicBoolean phase1Called = new AtomicBoolean();
AtomicBoolean phase2Called = new AtomicBoolean();
AtomicBoolean ensureClusterStateVersionCalled = new AtomicBoolean();
AtomicBoolean recoveriesDelayed = new AtomicBoolean();
AtomicBoolean relocated = new AtomicBoolean();
IndexShard shard = mock(IndexShard.class);
Translog.View translogView = mock(Translog.View.class);
when(shard.acquireTranslogView()).thenReturn(translogView);
when(shard.state()).then(i -> relocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED);
doAnswer(i -> {
relocated.set(true);
assertTrue(recoveriesDelayed.get());
return null;
}).when(shard).relocated(any(String.class));
RecoveryTargetHandler targetHandler = mock(RecoveryTargetHandler.class);
final Supplier<Long> currentClusterStateVersionSupplier = () -> {
assertFalse(ensureClusterStateVersionCalled.get());
assertTrue(recoveriesDelayed.get());
ensureClusterStateVersionCalled.set(true);
return 0L;
};
final Function<String, Releasable> delayNewRecoveries = s -> {
assertTrue(phase1Called.get());
assertTrue(phase2Called.get());
assertFalse(recoveriesDelayed.get());
recoveriesDelayed.set(true);
return () -> {
assertTrue(recoveriesDelayed.get());
recoveriesDelayed.set(false);
};
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, targetHandler, request, currentClusterStateVersionSupplier,
delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
phase1Called.set(true);
}
@Override
public void phase2(Translog.Snapshot snapshot) {
phase2Called.set(true);
}
};
handler.recoverToTarget();
assertTrue(ensureClusterStateVersionCalled.get());
assertTrue(phase1Called.get());
assertTrue(phase2Called.get());
assertTrue(relocated.get());
assertFalse(recoveriesDelayed.get());
}
private Store newStore(Path path) throws IOException {
return newStore(path, true);
}

View File

@ -51,7 +51,7 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase {
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
}
});
}, version -> {});
try (IndexOutput indexOutput = status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw"), status.store())) {
indexOutput.writeInt(1);
IndexOutput openIndexOutput = status.getOpenIndexOutput("foo.bar");

View File

@ -19,34 +19,20 @@
package org.elasticsearch.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveriesCollection;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
@ -65,7 +51,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testLastAccessTimeUpdate() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) {
final long lastSeenTime = status.status().lastAccessTime();
@ -82,22 +68,22 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testRecoveryTimeout() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final AtomicBoolean failed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(),
new RecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
latch.countDown();
}
new RecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
latch.countDown();
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
failed.set(true);
latch.countDown();
}
}, TimeValue.timeValueMillis(100));
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
failed.set(true);
latch.countDown();
}
}, TimeValue.timeValueMillis(100));
try {
latch.await(30, TimeUnit.SECONDS);
assertTrue("recovery failed to timeout", failed.get());
@ -110,7 +96,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testRecoveryCancellation() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) {
@ -129,7 +115,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
shards.startAll();
int numDocs = randomIntBetween(1, 15);
shards.indexDocs(numDocs);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
IndexShard shard = shards.addReplica();
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard);
try (RecoveriesCollection.RecoveryRef recovery = collection.getRecovery(recoveryId)) {

View File

@ -444,7 +444,7 @@ public class RelocationIT extends ESIntegTestCase {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) // NORELEASE: set to randomInt(halfNodes - 1) once replica data loss is fixed
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))
));
assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2]));
int numDocs = randomIntBetween(100, 150);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBoundsTests;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
public class DateHistogramTests extends BaseAggregationTestCase<DateHistogramAggregationBuilder> {
@ -62,9 +63,7 @@ public class DateHistogramTests extends BaseAggregationTestCase<DateHistogramAgg
}
}
if (randomBoolean()) {
long extendedBoundsMin = randomIntBetween(-100000, 100000);
long extendedBoundsMax = randomIntBetween((int) extendedBoundsMin, 200000);
factory.extendedBounds(new ExtendedBounds(extendedBoundsMin, extendedBoundsMax));
factory.extendedBounds(ExtendedBoundsTests.randomExtendedBounds());
}
if (randomBoolean()) {
factory.format("###.##");

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBoundsTests;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
@ -32,9 +33,7 @@ public class HistogramTests extends BaseAggregationTestCase<HistogramAggregation
factory.field(INT_FIELD_NAME);
factory.interval(randomIntBetween(1, 100000));
if (randomBoolean()) {
long extendedBoundsMin = randomIntBetween(-100000, 100000);
long extendedBoundsMax = randomIntBetween((int) extendedBoundsMin, 200000);
factory.extendedBounds(new ExtendedBounds(extendedBoundsMin, extendedBoundsMax));
factory.extendedBounds(ExtendedBoundsTests.randomExtendedBounds());
}
if (randomBoolean()) {
factory.format("###.##");

View File

@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
import java.io.IOException;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ExtendedBoundsTests extends ESTestCase {
/**
* Construct a random {@link ExtendedBounds}.
*/
public static ExtendedBounds randomExtendedBounds() {
ExtendedBounds bounds = randomParsedExtendedBounds();
if (randomBoolean()) {
bounds = unparsed(bounds);
}
return bounds;
}
/**
* Construct a random {@link ExtendedBounds} in pre-parsed form.
*/
public static ExtendedBounds randomParsedExtendedBounds() {
if (randomBoolean()) {
// Construct with one missing bound
if (randomBoolean()) {
return new ExtendedBounds(null, randomLong());
}
return new ExtendedBounds(randomLong(), null);
}
long a = randomLong();
long b;
do {
b = randomLong();
} while (a == b);
long min = min(a, b);
long max = max(a, b);
return new ExtendedBounds(min, max);
}
/**
* Convert an extended bounds in parsed for into one in unparsed form.
*/
public static ExtendedBounds unparsed(ExtendedBounds template) {
// It'd probably be better to randomize the formatter
FormatDateTimeFormatter formatter = Joda.forPattern("dateOptionalTime");
String minAsStr = template.getMin() == null ? null : formatter.printer().print(new Instant(template.getMin()));
String maxAsStr = template.getMax() == null ? null : formatter.printer().print(new Instant(template.getMax()));
return new ExtendedBounds(minAsStr, maxAsStr);
}
public void testParseAndValidate() {
long now = randomLong();
SearchContext context = mock(SearchContext.class);
when(context.nowInMillis()).thenReturn(now);
FormatDateTimeFormatter formatter = Joda.forPattern("dateOptionalTime");
DocValueFormat format = new DocValueFormat.DateTime(formatter, DateTimeZone.UTC);
ExtendedBounds expected = randomParsedExtendedBounds();
ExtendedBounds parsed = unparsed(expected).parseAndValidate("test", context, format);
// parsed won't *equal* expected because equal includes the String parts
assertEquals(expected.getMin(), parsed.getMin());
assertEquals(expected.getMax(), parsed.getMax());
parsed = new ExtendedBounds("now", null).parseAndValidate("test", context, format);
assertEquals(now, (long) parsed.getMin());
assertNull(parsed.getMax());
parsed = new ExtendedBounds(null, "now").parseAndValidate("test", context, format);
assertNull(parsed.getMin());
assertEquals(now, (long) parsed.getMax());
SearchParseException e = expectThrows(SearchParseException.class,
() -> new ExtendedBounds(100L, 90L).parseAndValidate("test", context, format));
assertEquals("[extended_bounds.min][100] cannot be greater than [extended_bounds.max][90] for histogram aggregation [test]",
e.getMessage());
e = expectThrows(SearchParseException.class,
() -> unparsed(new ExtendedBounds(100L, 90L)).parseAndValidate("test", context, format));
assertEquals("[extended_bounds.min][100] cannot be greater than [extended_bounds.max][90] for histogram aggregation [test]",
e.getMessage());
}
public void testTransportRoundTrip() throws IOException {
ExtendedBounds orig = randomExtendedBounds();
BytesReference origBytes;
try (BytesStreamOutput out = new BytesStreamOutput()) {
orig.writeTo(out);
origBytes = out.bytes();
}
ExtendedBounds read;
try (StreamInput in = origBytes.streamInput()) {
read = new ExtendedBounds(in);
assertEquals("read fully", 0, in.available());
}
assertEquals(orig, read);
BytesReference readBytes;
try (BytesStreamOutput out = new BytesStreamOutput()) {
read.writeTo(out);
readBytes = out.bytes();
}
assertEquals(origBytes, readBytes);
}
public void testXContentRoundTrip() throws Exception {
ExtendedBounds orig = randomExtendedBounds();
try (XContentBuilder out = JsonXContent.contentBuilder()) {
orig.toXContent(out, ToXContent.EMPTY_PARAMS);
try (XContentParser in = JsonXContent.jsonXContent.createParser(out.bytes())) {
in.nextToken();
ExtendedBounds read = ExtendedBounds.PARSER.apply(in, () -> ParseFieldMatcher.STRICT);
assertEquals(orig, read);
} catch (Exception e) {
throw new Exception("Error parsing [" + out.bytes().utf8ToString() + "]", e);
}
}
}
}

View File

@ -664,8 +664,16 @@ public class ElasticsearchAssertions {
newInstance.readFrom(input);
assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(),
equalTo(0));
assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]",
serialize(version, streamable), equalTo(orig));
BytesReference newBytes = serialize(version, streamable);
if (false == orig.equals(newBytes)) {
// The bytes are different. That is a failure. Lets try to throw a useful exception for debugging.
String message = "Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable
+ "]";
// If the bytes are different then comparing BytesRef's toStrings will show you *where* they are different
assertEquals(message, orig.toBytesRef().toString(), newBytes.toBytesRef().toString());
// They bytes aren't different. Very very weird.
fail(message);
}
} catch (Exception ex) {
throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex);
}