2010-02-08 15:30:06 +02:00
/ *
2014-01-06 22:48:02 +01:00
* Licensed to Elasticsearch under one or more contributor
* license agreements . See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership . Elasticsearch licenses this file to you under
* the Apache License , Version 2 . 0 ( the " License " ) ; you may
* not use this file except in compliance with the License .
* You may obtain a copy of the License at
2010-02-08 15:30:06 +02:00
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing ,
* software distributed under the License is distributed on an
* " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND , either express or implied . See the License for the
* specific language governing permissions and limitations
* under the License .
* /
package org.elasticsearch.index.gateway ;
2014-12-15 13:47:34 +01:00
import org.apache.lucene.index.IndexWriter ;
import org.apache.lucene.index.IndexWriterConfig ;
import org.apache.lucene.index.SegmentInfos ;
2015-02-18 14:13:16 +01:00
import org.apache.lucene.store.Directory ;
2014-12-15 13:47:34 +01:00
import org.elasticsearch.ExceptionsHelper ;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction ;
import org.elasticsearch.common.inject.Inject ;
import org.elasticsearch.common.lucene.Lucene ;
import org.elasticsearch.common.settings.Settings ;
import org.elasticsearch.common.unit.TimeValue ;
2014-12-30 17:16:35 +01:00
import org.elasticsearch.common.util.CancellableThreads ;
2014-12-15 13:47:34 +01:00
import org.elasticsearch.common.util.concurrent.FutureUtils ;
import org.elasticsearch.index.IndexService ;
2015-04-07 09:25:04 +02:00
import org.elasticsearch.index.engine.EngineException ;
2015-04-16 18:16:00 +02:00
import org.elasticsearch.index.mapper.Mapping ;
2014-12-15 13:47:34 +01:00
import org.elasticsearch.index.settings.IndexSettings ;
import org.elasticsearch.index.shard.AbstractIndexShardComponent ;
2015-02-10 08:54:03 +01:00
import org.elasticsearch.index.shard.IndexShard ;
2014-12-15 13:47:34 +01:00
import org.elasticsearch.index.shard.IndexShardState ;
import org.elasticsearch.index.shard.ShardId ;
2014-02-18 14:28:49 -08:00
import org.elasticsearch.indices.recovery.RecoveryState ;
2014-12-15 13:47:34 +01:00
import org.elasticsearch.threadpool.ThreadPool ;
2010-02-08 15:30:06 +02:00
2014-11-25 12:43:57 +01:00
import java.io.Closeable ;
2014-12-15 13:47:34 +01:00
import java.io.IOException ;
import java.util.Arrays ;
2015-04-16 18:16:00 +02:00
import java.util.Map ;
2014-12-15 13:47:34 +01:00
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.ScheduledFuture ;
import java.util.concurrent.TimeUnit ;
2015-04-16 18:16:00 +02:00
import java.util.concurrent.atomic.AtomicReference ;
2014-11-25 12:43:57 +01:00
2010-02-08 15:30:06 +02:00
/ * *
2011-12-06 02:42:25 +02:00
*
2010-02-08 15:30:06 +02:00
* /
2014-12-15 13:47:34 +01:00
public class IndexShardGateway extends AbstractIndexShardComponent implements Closeable {
2010-02-08 15:30:06 +02:00
2014-12-15 13:47:34 +01:00
private final ThreadPool threadPool ;
private final MappingUpdatedAction mappingUpdatedAction ;
private final IndexService indexService ;
private final IndexShard indexShard ;
private final TimeValue waitForMappingUpdatePostRecovery ;
private final TimeValue syncInterval ;
2010-08-17 08:17:29 +03:00
2015-04-16 18:16:00 +02:00
private volatile ScheduledFuture < ? > flushScheduler ;
2014-12-30 17:16:35 +01:00
private final CancellableThreads cancellableThreads = new CancellableThreads ( ) ;
2010-02-08 15:30:06 +02:00
2014-12-15 13:47:34 +01:00
@Inject
public IndexShardGateway ( ShardId shardId , @IndexSettings Settings indexSettings , ThreadPool threadPool , MappingUpdatedAction mappingUpdatedAction ,
IndexService indexService , IndexShard indexShard ) {
super ( shardId , indexSettings ) ;
this . threadPool = threadPool ;
this . mappingUpdatedAction = mappingUpdatedAction ;
this . indexService = indexService ;
this . indexShard = indexShard ;
2015-04-16 18:16:00 +02:00
this . waitForMappingUpdatePostRecovery = indexSettings . getAsTime ( " index.gateway.wait_for_mapping_update_post_recovery " , TimeValue . timeValueMinutes ( 15 ) ) ;
2015-02-27 12:04:11 +01:00
syncInterval = indexSettings . getAsTime ( " index.gateway.sync " , TimeValue . timeValueSeconds ( 5 ) ) ;
2014-12-15 13:47:34 +01:00
if ( syncInterval . millis ( ) > 0 ) {
this . indexShard . translog ( ) . syncOnEachOperation ( false ) ;
flushScheduler = threadPool . schedule ( syncInterval , ThreadPool . Names . SAME , new Sync ( ) ) ;
} else if ( syncInterval . millis ( ) = = 0 ) {
flushScheduler = null ;
this . indexShard . translog ( ) . syncOnEachOperation ( true ) ;
} else {
flushScheduler = null ;
}
}
2015-02-10 08:54:03 +01:00
/ * *
* Recovers the state of the shard from the gateway .
* /
2014-12-15 13:47:34 +01:00
public void recover ( boolean indexShouldExists , RecoveryState recoveryState ) throws IndexShardGatewayRecoveryException {
2015-02-23 15:02:04 +01:00
indexShard . prepareForIndexRecovery ( ) ;
2014-12-15 13:47:34 +01:00
long version = - 1 ;
2015-04-16 18:16:00 +02:00
final Map < String , Mapping > typesToUpdate ;
2015-02-18 14:13:16 +01:00
SegmentInfos si = null ;
2014-12-15 13:47:34 +01:00
indexShard . store ( ) . incRef ( ) ;
try {
try {
indexShard . store ( ) . failIfCorrupted ( ) ;
try {
si = Lucene . readSegmentInfos ( indexShard . store ( ) . directory ( ) ) ;
} catch ( Throwable e ) {
String files = " _unknown_ " ;
try {
files = Arrays . toString ( indexShard . store ( ) . directory ( ) . listAll ( ) ) ;
} catch ( Throwable e1 ) {
files + = " (failure= " + ExceptionsHelper . detailedMessage ( e1 ) + " ) " ;
}
if ( indexShouldExists ) {
throw new IndexShardGatewayRecoveryException ( shardId ( ) , " shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files , e ) ;
}
}
if ( si ! = null ) {
if ( indexShouldExists ) {
version = si . getVersion ( ) ;
} else {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger . trace ( " cleaning existing shard, shouldn't exists " ) ;
IndexWriter writer = new IndexWriter ( indexShard . store ( ) . directory ( ) , new IndexWriterConfig ( Lucene . STANDARD_ANALYZER ) . setOpenMode ( IndexWriterConfig . OpenMode . CREATE ) ) ;
writer . close ( ) ;
2015-04-07 09:25:04 +02:00
recoveryState . getTranslog ( ) . totalOperations ( 0 ) ;
2014-12-15 13:47:34 +01:00
}
}
} catch ( Throwable e ) {
throw new IndexShardGatewayRecoveryException ( shardId ( ) , " failed to fetch index version after copying it over " , e ) ;
}
recoveryState . getIndex ( ) . updateVersion ( version ) ;
// since we recover from local, just fill the files and size
try {
2015-02-10 08:54:03 +01:00
final RecoveryState . Index index = recoveryState . getIndex ( ) ;
2015-02-18 14:13:16 +01:00
if ( si ! = null ) {
final Directory directory = indexShard . store ( ) . directory ( ) ;
for ( String name : Lucene . files ( si ) ) {
long length = directory . fileLength ( name ) ;
2015-02-10 08:54:03 +01:00
index . addFileDetail ( name , length , true ) ;
2015-02-18 14:13:16 +01:00
}
2014-12-15 13:47:34 +01:00
}
2015-02-10 08:54:03 +01:00
} catch ( IOException e ) {
logger . debug ( " failed to list file details " , e ) ;
2014-12-15 13:47:34 +01:00
}
2015-04-07 09:25:04 +02:00
if ( indexShouldExists = = false ) {
2015-03-07 16:50:32 +01:00
recoveryState . getTranslog ( ) . totalOperations ( 0 ) ;
recoveryState . getTranslog ( ) . totalOperationsOnStart ( 0 ) ;
2014-12-15 13:47:34 +01:00
}
2015-04-07 09:25:04 +02:00
typesToUpdate = indexShard . performTranslogRecovery ( ) ;
2014-12-15 13:47:34 +01:00
2015-02-23 15:02:04 +01:00
indexShard . finalizeRecovery ( ) ;
2015-04-16 18:16:00 +02:00
for ( Map . Entry < String , Mapping > entry : typesToUpdate . entrySet ( ) ) {
validateMappingUpdate ( entry . getKey ( ) , entry . getValue ( ) ) ;
}
2015-02-23 15:02:04 +01:00
indexShard . postRecovery ( " post recovery from gateway " ) ;
2015-04-07 09:25:04 +02:00
} catch ( EngineException e ) {
2015-02-17 11:51:03 +01:00
throw new IndexShardGatewayRecoveryException ( shardId , " failed to recovery from gateway " , e ) ;
2014-12-30 17:11:06 +01:00
} finally {
indexShard . store ( ) . decRef ( ) ;
}
2015-04-16 18:16:00 +02:00
}
2014-12-15 13:47:34 +01:00
2015-04-16 18:16:00 +02:00
private void validateMappingUpdate ( final String type , Mapping update ) {
final CountDownLatch latch = new CountDownLatch ( 1 ) ;
final AtomicReference < Throwable > error = new AtomicReference < > ( ) ;
mappingUpdatedAction . updateMappingOnMaster ( indexService . index ( ) . name ( ) , indexService . indexUUID ( ) , type , update , new MappingUpdatedAction . MappingUpdateListener ( ) {
@Override
public void onMappingUpdate ( ) {
latch . countDown ( ) ;
}
@Override
public void onFailure ( Throwable t ) {
latch . countDown ( ) ;
error . set ( t ) ;
}
} ) ;
cancellableThreads . execute ( new CancellableThreads . Interruptable ( ) {
@Override
public void run ( ) throws InterruptedException {
try {
if ( latch . await ( waitForMappingUpdatePostRecovery . millis ( ) , TimeUnit . MILLISECONDS ) = = false ) {
logger . debug ( " waited for mapping update on master for [{}], yet timed out " , type ) ;
} else {
if ( error . get ( ) ! = null ) {
throw new IndexShardGatewayRecoveryException ( shardId , " Failed to propagate mappings on master post recovery " , error . get ( ) ) ;
2014-12-30 17:16:35 +01:00
}
}
2015-04-16 18:16:00 +02:00
} catch ( InterruptedException e ) {
logger . debug ( " interrupted while waiting for mapping update " ) ;
throw e ;
2014-12-15 13:47:34 +01:00
}
2015-04-16 18:16:00 +02:00
}
} ) ;
2014-12-15 13:47:34 +01:00
}
@Override
public void close ( ) {
FutureUtils . cancel ( flushScheduler ) ;
2014-12-30 17:16:35 +01:00
cancellableThreads . cancel ( " closed " ) ;
2014-12-15 13:47:34 +01:00
}
class Sync implements Runnable {
@Override
public void run ( ) {
// don't re-schedule if its closed..., we are done
if ( indexShard . state ( ) = = IndexShardState . CLOSED ) {
return ;
}
if ( indexShard . state ( ) = = IndexShardState . STARTED & & indexShard . translog ( ) . syncNeeded ( ) ) {
threadPool . executor ( ThreadPool . Names . FLUSH ) . execute ( new Runnable ( ) {
@Override
public void run ( ) {
try {
indexShard . translog ( ) . sync ( ) ;
} catch ( Exception e ) {
if ( indexShard . state ( ) = = IndexShardState . STARTED ) {
logger . warn ( " failed to sync translog " , e ) ;
}
}
if ( indexShard . state ( ) ! = IndexShardState . CLOSED ) {
flushScheduler = threadPool . schedule ( syncInterval , ThreadPool . Names . SAME , Sync . this ) ;
}
}
} ) ;
} else {
flushScheduler = threadPool . schedule ( syncInterval , ThreadPool . Names . SAME , Sync . this ) ;
}
}
}
2014-12-15 22:53:58 +01:00
@Override
public String toString ( ) {
return " shard_gateway " ;
}
2010-02-08 15:30:06 +02:00
}