2010-07-13 18:01:57 +03:00
/ *
2011-12-06 02:42:25 +02:00
* Licensed to ElasticSearch and Shay Banon under one
2010-07-13 18:01:57 +03:00
* or more contributor license agreements . See the NOTICE file
* distributed with this work for additional information
2011-12-06 02:42:25 +02:00
* regarding copyright ownership . ElasticSearch licenses this
2010-07-13 18:01:57 +03:00
* file to you under the Apache License , Version 2 . 0 ( the
* " License " ) ; you may not use this file except in compliance
* with the License . You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing ,
* software distributed under the License is distributed on an
* " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND , either express or implied . See the License for the
* specific language governing permissions and limitations
* under the License .
* /
package org.elasticsearch.cluster.metadata ;
2011-12-06 02:42:25 +02:00
import com.google.common.collect.Lists ;
import com.google.common.collect.Maps ;
import com.google.common.collect.Sets ;
2012-02-15 11:45:58 +02:00
import com.google.common.io.Closeables ;
2010-11-26 15:45:18 +02:00
import org.elasticsearch.ElasticSearchException ;
2012-01-24 13:28:15 +02:00
import org.elasticsearch.Version ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.cluster.ClusterService ;
import org.elasticsearch.cluster.ClusterState ;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask ;
2011-04-27 00:24:27 +03:00
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction ;
2010-08-29 01:24:23 +03:00
import org.elasticsearch.cluster.block.ClusterBlock ;
import org.elasticsearch.cluster.block.ClusterBlocks ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.cluster.routing.RoutingTable ;
2011-09-06 17:11:55 +03:00
import org.elasticsearch.cluster.routing.allocation.AllocationService ;
2010-09-21 11:37:36 +02:00
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation ;
2013-02-05 22:17:49 +01:00
import org.elasticsearch.common.Priority ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.common.Strings ;
import org.elasticsearch.common.component.AbstractComponent ;
2010-08-19 17:06:36 +03:00
import org.elasticsearch.common.compress.CompressedString ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.common.inject.Inject ;
import org.elasticsearch.common.io.Streams ;
2010-11-26 15:45:18 +02:00
import org.elasticsearch.common.regex.Regex ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.common.settings.ImmutableSettings ;
import org.elasticsearch.common.settings.Settings ;
import org.elasticsearch.common.unit.TimeValue ;
2010-11-26 15:45:18 +02:00
import org.elasticsearch.common.xcontent.XContentFactory ;
import org.elasticsearch.common.xcontent.XContentHelper ;
2012-02-15 11:45:58 +02:00
import org.elasticsearch.common.xcontent.XContentParser ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.env.Environment ;
import org.elasticsearch.index.Index ;
2010-08-04 09:02:10 +03:00
import org.elasticsearch.index.mapper.DocumentMapper ;
import org.elasticsearch.index.mapper.MapperParsingException ;
import org.elasticsearch.index.mapper.MapperService ;
2011-01-13 16:20:31 +02:00
import org.elasticsearch.index.percolator.PercolatorService ;
2010-08-04 09:02:10 +03:00
import org.elasticsearch.index.service.IndexService ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.indices.IndexAlreadyExistsException ;
2010-08-04 09:02:10 +03:00
import org.elasticsearch.indices.IndicesService ;
2010-07-13 18:01:57 +03:00
import org.elasticsearch.indices.InvalidIndexNameException ;
2010-09-21 13:35:09 +02:00
import org.elasticsearch.river.RiverIndexName ;
2011-04-27 00:24:27 +03:00
import org.elasticsearch.threadpool.ThreadPool ;
2010-07-13 18:01:57 +03:00
import java.io.File ;
import java.io.FileReader ;
import java.io.IOException ;
2011-12-06 02:42:25 +02:00
import java.util.* ;
2011-04-27 00:24:27 +03:00
import java.util.concurrent.ScheduledFuture ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicInteger ;
2010-07-13 18:01:57 +03:00
2012-05-07 14:27:30 +03:00
import static com.google.common.collect.Maps.newHashMap ;
2011-12-06 02:42:25 +02:00
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder ;
2010-07-13 18:01:57 +03:00
import static org.elasticsearch.cluster.metadata.IndexMetaData.* ;
2011-12-06 02:42:25 +02:00
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder ;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder ;
2010-07-13 18:01:57 +03:00
/ * *
2011-12-06 02:42:25 +02:00
*
2010-07-13 18:01:57 +03:00
* /
public class MetaDataCreateIndexService extends AbstractComponent {
private final Environment environment ;
2011-04-27 00:24:27 +03:00
private final ThreadPool threadPool ;
2010-07-13 18:01:57 +03:00
private final ClusterService clusterService ;
2010-08-04 09:02:10 +03:00
private final IndicesService indicesService ;
2011-09-06 17:11:55 +03:00
private final AllocationService allocationService ;
2010-07-13 18:01:57 +03:00
2011-04-27 00:24:27 +03:00
private final NodeIndexCreatedAction nodeIndexCreatedAction ;
2011-09-02 09:36:25 +03:00
private final MetaDataService metaDataService ;
2010-09-21 15:07:51 +02:00
private final String riverIndexName ;
2011-12-06 02:42:25 +02:00
@Inject
public MetaDataCreateIndexService ( Settings settings , Environment environment , ThreadPool threadPool , ClusterService clusterService , IndicesService indicesService ,
AllocationService allocationService , NodeIndexCreatedAction nodeIndexCreatedAction , MetaDataService metaDataService , @RiverIndexName String riverIndexName ) {
2010-07-13 18:01:57 +03:00
super ( settings ) ;
this . environment = environment ;
2011-04-27 00:24:27 +03:00
this . threadPool = threadPool ;
2010-07-13 18:01:57 +03:00
this . clusterService = clusterService ;
2010-08-04 09:02:10 +03:00
this . indicesService = indicesService ;
2011-09-06 17:11:55 +03:00
this . allocationService = allocationService ;
2011-04-27 00:24:27 +03:00
this . nodeIndexCreatedAction = nodeIndexCreatedAction ;
2011-09-02 09:36:25 +03:00
this . metaDataService = metaDataService ;
2010-09-21 15:07:51 +02:00
this . riverIndexName = riverIndexName ;
2010-07-13 18:01:57 +03:00
}
2011-04-27 00:24:27 +03:00
public void createIndex ( final Request request , final Listener userListener ) {
2010-09-04 14:04:51 +03:00
ImmutableSettings . Builder updatedSettingsBuilder = ImmutableSettings . settingsBuilder ( ) ;
for ( Map . Entry < String , String > entry : request . settings . getAsMap ( ) . entrySet ( ) ) {
if ( ! entry . getKey ( ) . startsWith ( " index. " ) ) {
updatedSettingsBuilder . put ( " index. " + entry . getKey ( ) , entry . getValue ( ) ) ;
} else {
updatedSettingsBuilder . put ( entry . getKey ( ) , entry . getValue ( ) ) ;
}
}
request . settings ( updatedSettingsBuilder . build ( ) ) ;
2011-04-27 00:24:27 +03:00
2011-09-02 09:36:25 +03:00
// we lock here, and not within the cluster service callback since we don't want to
// block the whole cluster state handling
MetaDataService . MdLock mdLock = metaDataService . indexMetaDataLock ( request . index ) ;
try {
mdLock . lock ( ) ;
} catch ( InterruptedException e ) {
userListener . onFailure ( e ) ;
return ;
}
final CreateIndexListener listener = new CreateIndexListener ( mdLock , request , userListener ) ;
2010-09-04 14:04:51 +03:00
2013-02-05 22:17:49 +01:00
clusterService . submitStateUpdateTask ( " create-index [ " + request . index + " ], cause [ " + request . cause + " ] " , Priority . URGENT , new ProcessedClusterStateUpdateTask ( ) {
2011-12-06 02:42:25 +02:00
@Override
public ClusterState execute ( ClusterState currentState ) {
2013-01-30 10:39:38 -05:00
boolean indexCreated = false ;
String failureReason = null ;
2010-07-13 18:01:57 +03:00
try {
2011-04-17 18:39:00 +03:00
try {
validate ( request , currentState ) ;
2013-04-04 22:58:20 +02:00
} catch ( Throwable e ) {
2011-04-17 18:39:00 +03:00
listener . onFailure ( e ) ;
return currentState ;
2010-07-13 18:01:57 +03:00
}
2010-11-26 15:45:18 +02:00
// we only find a template when its an API call (a new index)
2011-04-17 18:39:00 +03:00
// find templates, highest order are better matching
2012-02-15 11:45:58 +02:00
List < IndexTemplateMetaData > templates = findTemplates ( request , currentState ) ;
2010-11-26 15:45:18 +02:00
2012-05-07 14:00:37 +03:00
Map < String , Custom > customs = Maps . newHashMap ( ) ;
2010-11-26 15:45:18 +02:00
// add the request mapping
Map < String , Map < String , Object > > mappings = Maps . newHashMap ( ) ;
2012-05-13 14:11:19 +03:00
// if its a _percolator index, don't index the query object
if ( request . index . equals ( PercolatorService . INDEX_NAME ) ) {
mappings . put ( MapperService . DEFAULT_MAPPING , parseMapping ( " { \ n " +
" \" _default_ \" :{ \ n " +
" \" properties \" : { \ n " +
" \" query \" : { \ n " +
" \" type \" : \" object \" , \ n " +
" \" enabled \" : false \ n " +
" } \ n " +
" } \ n " +
" } \ n " +
" } " ) ) ;
}
2010-11-26 15:45:18 +02:00
for ( Map . Entry < String , String > entry : request . mappings . entrySet ( ) ) {
mappings . put ( entry . getKey ( ) , parseMapping ( entry . getValue ( ) ) ) ;
2010-07-13 18:01:57 +03:00
}
2012-05-07 14:27:30 +03:00
for ( Map . Entry < String , Custom > entry : request . customs . entrySet ( ) ) {
customs . put ( entry . getKey ( ) , entry . getValue ( ) ) ;
}
2012-05-07 14:00:37 +03:00
2010-11-26 15:45:18 +02:00
// apply templates, merging the mappings into the request mapping if exists
for ( IndexTemplateMetaData template : templates ) {
for ( Map . Entry < String , CompressedString > entry : template . mappings ( ) . entrySet ( ) ) {
if ( mappings . containsKey ( entry . getKey ( ) ) ) {
XContentHelper . mergeDefaults ( mappings . get ( entry . getKey ( ) ) , parseMapping ( entry . getValue ( ) . string ( ) ) ) ;
} else {
mappings . put ( entry . getKey ( ) , parseMapping ( entry . getValue ( ) . string ( ) ) ) ;
}
2010-07-13 18:01:57 +03:00
}
2012-05-07 14:00:37 +03:00
// handle custom
for ( Map . Entry < String , Custom > customEntry : template . customs ( ) . entrySet ( ) ) {
String type = customEntry . getKey ( ) ;
IndexMetaData . Custom custom = customEntry . getValue ( ) ;
IndexMetaData . Custom existing = customs . get ( type ) ;
if ( existing = = null ) {
customs . put ( type , custom ) ;
} else {
IndexMetaData . Custom merged = IndexMetaData . lookupFactorySafe ( type ) . merge ( existing , custom ) ;
customs . put ( type , merged ) ;
}
}
2010-11-26 15:45:18 +02:00
}
// now add config level mappings
2011-04-17 18:39:00 +03:00
File mappingsDir = new File ( environment . configFile ( ) , " mappings " ) ;
if ( mappingsDir . exists ( ) & & mappingsDir . isDirectory ( ) ) {
// first index level
File indexMappingsDir = new File ( mappingsDir , request . index ) ;
if ( indexMappingsDir . exists ( ) & & indexMappingsDir . isDirectory ( ) ) {
addMappings ( mappings , indexMappingsDir ) ;
}
2010-11-26 15:45:18 +02:00
2011-04-17 18:39:00 +03:00
// second is the _default mapping
File defaultMappingsDir = new File ( mappingsDir , " _default " ) ;
if ( defaultMappingsDir . exists ( ) & & defaultMappingsDir . isDirectory ( ) ) {
addMappings ( mappings , defaultMappingsDir ) ;
2010-07-13 18:01:57 +03:00
}
}
2010-11-26 15:45:18 +02:00
ImmutableSettings . Builder indexSettingsBuilder = settingsBuilder ( ) ;
// apply templates, here, in reverse order, since first ones are better matching
for ( int i = templates . size ( ) - 1 ; i > = 0 ; i - - ) {
indexSettingsBuilder . put ( templates . get ( i ) . settings ( ) ) ;
2010-08-19 17:06:36 +03:00
}
2010-11-26 15:45:18 +02:00
// now, put the request settings, so they override templates
indexSettingsBuilder . put ( request . settings ) ;
2010-07-13 18:01:57 +03:00
2011-01-13 16:20:31 +02:00
if ( request . index . equals ( PercolatorService . INDEX_NAME ) ) {
// if its percolator, always 1 shard
indexSettingsBuilder . put ( SETTING_NUMBER_OF_SHARDS , 1 ) ;
} else {
if ( indexSettingsBuilder . get ( SETTING_NUMBER_OF_SHARDS ) = = null ) {
if ( request . index . equals ( riverIndexName ) ) {
indexSettingsBuilder . put ( SETTING_NUMBER_OF_SHARDS , settings . getAsInt ( SETTING_NUMBER_OF_SHARDS , 1 ) ) ;
} else {
indexSettingsBuilder . put ( SETTING_NUMBER_OF_SHARDS , settings . getAsInt ( SETTING_NUMBER_OF_SHARDS , 5 ) ) ;
}
2010-09-21 15:07:51 +02:00
}
2010-07-13 18:01:57 +03:00
}
2011-01-13 16:20:31 +02:00
if ( request . index . equals ( PercolatorService . INDEX_NAME ) ) {
// if its percolator, always set number of replicas to 0, and expand to 0-all
indexSettingsBuilder . put ( SETTING_NUMBER_OF_REPLICAS , 0 ) ;
indexSettingsBuilder . put ( SETTING_AUTO_EXPAND_REPLICAS , " 0-all " ) ;
} else {
if ( indexSettingsBuilder . get ( SETTING_NUMBER_OF_REPLICAS ) = = null ) {
if ( request . index . equals ( riverIndexName ) ) {
indexSettingsBuilder . put ( SETTING_NUMBER_OF_REPLICAS , settings . getAsInt ( SETTING_NUMBER_OF_REPLICAS , 1 ) ) ;
} else {
indexSettingsBuilder . put ( SETTING_NUMBER_OF_REPLICAS , settings . getAsInt ( SETTING_NUMBER_OF_REPLICAS , 1 ) ) ;
}
2010-09-21 15:07:51 +02:00
}
2010-07-13 18:01:57 +03:00
}
2012-01-24 13:28:15 +02:00
2012-04-11 20:53:01 +03:00
if ( settings . get ( SETTING_AUTO_EXPAND_REPLICAS ) ! = null & & indexSettingsBuilder . get ( SETTING_AUTO_EXPAND_REPLICAS ) = = null ) {
indexSettingsBuilder . put ( SETTING_AUTO_EXPAND_REPLICAS , settings . get ( SETTING_AUTO_EXPAND_REPLICAS ) ) ;
}
2012-01-24 13:28:15 +02:00
indexSettingsBuilder . put ( SETTING_VERSION_CREATED , Version . CURRENT ) ;
2010-07-13 18:01:57 +03:00
Settings actualIndexSettings = indexSettingsBuilder . build ( ) ;
2010-11-26 15:45:18 +02:00
// Set up everything, now locally create the index to see that things are ok, and apply
2010-08-04 09:02:10 +03:00
// create the index here (on the master) to validate it can be created, as well as adding the mapping
indicesService . createIndex ( request . index , actualIndexSettings , clusterService . state ( ) . nodes ( ) . localNode ( ) . id ( ) ) ;
2013-01-30 10:39:38 -05:00
indexCreated = true ;
2010-08-04 09:02:10 +03:00
// now add the mappings
IndexService indexService = indicesService . indexServiceSafe ( request . index ) ;
MapperService mapperService = indexService . mapperService ( ) ;
2010-11-27 23:30:18 +02:00
// first, add the default mapping
if ( mappings . containsKey ( MapperService . DEFAULT_MAPPING ) ) {
try {
2012-11-23 16:55:52 -05:00
mapperService . merge ( MapperService . DEFAULT_MAPPING , XContentFactory . jsonBuilder ( ) . map ( mappings . get ( MapperService . DEFAULT_MAPPING ) ) . string ( ) , false ) ;
2010-11-27 23:30:18 +02:00
} catch ( Exception e ) {
2013-01-30 10:39:38 -05:00
failureReason = " failed on parsing default mapping on index creation " ;
2010-11-27 23:30:18 +02:00
throw new MapperParsingException ( " mapping [ " + MapperService . DEFAULT_MAPPING + " ] " , e ) ;
}
}
2010-11-26 15:45:18 +02:00
for ( Map . Entry < String , Map < String , Object > > entry : mappings . entrySet ( ) ) {
2010-11-27 23:30:18 +02:00
if ( entry . getKey ( ) . equals ( MapperService . DEFAULT_MAPPING ) ) {
continue ;
}
2010-08-04 09:02:10 +03:00
try {
2012-11-09 17:21:25 +01:00
// apply the default here, its the first time we parse it
2012-11-23 16:55:52 -05:00
mapperService . merge ( entry . getKey ( ) , XContentFactory . jsonBuilder ( ) . map ( entry . getValue ( ) ) . string ( ) , true ) ;
2010-08-04 09:02:10 +03:00
} catch ( Exception e ) {
2013-01-30 10:39:38 -05:00
failureReason = " failed on parsing mappings on index creation " ;
2010-08-04 09:02:10 +03:00
throw new MapperParsingException ( " mapping [ " + entry . getKey ( ) + " ] " , e ) ;
}
}
// now, update the mappings with the actual source
2010-11-15 21:21:18 +02:00
Map < String , MappingMetaData > mappingsMetaData = Maps . newHashMap ( ) ;
2010-08-04 09:02:10 +03:00
for ( DocumentMapper mapper : mapperService ) {
2010-11-15 21:21:18 +02:00
MappingMetaData mappingMd = new MappingMetaData ( mapper ) ;
mappingsMetaData . put ( mapper . type ( ) , mappingMd ) ;
2010-08-04 09:02:10 +03:00
}
2010-08-20 00:45:17 +03:00
final IndexMetaData . Builder indexMetaDataBuilder = newIndexMetaDataBuilder ( request . index ) . settings ( actualIndexSettings ) ;
2010-11-15 21:21:18 +02:00
for ( MappingMetaData mappingMd : mappingsMetaData . values ( ) ) {
indexMetaDataBuilder . putMapping ( mappingMd ) ;
2010-07-13 18:01:57 +03:00
}
2012-05-07 14:00:37 +03:00
for ( Map . Entry < String , Custom > customEntry : customs . entrySet ( ) ) {
indexMetaDataBuilder . putCustom ( customEntry . getKey ( ) , customEntry . getValue ( ) ) ;
}
2010-10-23 21:52:09 +02:00
indexMetaDataBuilder . state ( request . state ) ;
2013-01-30 10:39:38 -05:00
final IndexMetaData indexMetaData ;
try {
indexMetaData = indexMetaDataBuilder . build ( ) ;
} catch ( Exception e ) {
failureReason = " failed to build index metadata " ;
throw e ;
}
2010-08-04 09:02:10 +03:00
2010-07-13 18:01:57 +03:00
MetaData newMetaData = newMetaDataBuilder ( )
. metaData ( currentState . metaData ( ) )
2012-01-22 23:34:34 +02:00
. put ( indexMetaData , false )
2010-07-13 18:01:57 +03:00
. build ( ) ;
logger . info ( " [{}] creating index, cause [{}], shards [{}]/[{}], mappings {} " , request . index , request . cause , indexMetaData . numberOfShards ( ) , indexMetaData . numberOfReplicas ( ) , mappings . keySet ( ) ) ;
2010-08-29 01:24:23 +03:00
ClusterBlocks . Builder blocks = ClusterBlocks . builder ( ) . blocks ( currentState . blocks ( ) ) ;
if ( ! request . blocks . isEmpty ( ) ) {
for ( ClusterBlock block : request . blocks ) {
blocks . addIndexBlock ( request . index , block ) ;
}
}
2010-10-23 21:52:09 +02:00
if ( request . state = = State . CLOSE ) {
blocks . addIndexBlock ( request . index , MetaDataStateIndexService . INDEX_CLOSED_BLOCK ) ;
}
2010-08-29 01:24:23 +03:00
2011-04-17 18:39:00 +03:00
ClusterState updatedState = newClusterStateBuilder ( ) . state ( currentState ) . blocks ( blocks ) . metaData ( newMetaData ) . build ( ) ;
if ( request . state = = State . OPEN ) {
2012-06-29 01:01:26 +02:00
RoutingTable . Builder routingTableBuilder = RoutingTable . builder ( ) . routingTable ( updatedState . routingTable ( ) )
2012-09-17 16:00:55 +02:00
. addAsNew ( updatedState . metaData ( ) . index ( request . index ) ) ;
2011-09-06 17:11:55 +03:00
RoutingAllocation . Result routingResult = allocationService . reroute ( newClusterStateBuilder ( ) . state ( updatedState ) . routingTable ( routingTableBuilder ) . build ( ) ) ;
2011-04-17 18:39:00 +03:00
updatedState = newClusterStateBuilder ( ) . state ( updatedState ) . routingResult ( routingResult ) . build ( ) ;
}
2011-08-03 14:32:33 +03:00
// we wait for events from all nodes that the index has been added to the metadata
final AtomicInteger counter = new AtomicInteger ( currentState . nodes ( ) . size ( ) ) ;
2011-04-27 00:24:27 +03:00
2011-08-03 14:32:33 +03:00
final NodeIndexCreatedAction . Listener nodeIndexCreatedListener = new NodeIndexCreatedAction . Listener ( ) {
2011-12-06 02:42:25 +02:00
@Override
public void onNodeIndexCreated ( String index , String nodeId ) {
2011-08-03 14:32:33 +03:00
if ( index . equals ( request . index ) ) {
if ( counter . decrementAndGet ( ) = = 0 ) {
listener . onResponse ( new Response ( true , indexMetaData ) ) ;
nodeIndexCreatedAction . remove ( this ) ;
2011-04-27 00:24:27 +03:00
}
}
2011-08-03 14:32:33 +03:00
}
} ;
2011-04-27 00:24:27 +03:00
2011-08-03 14:32:33 +03:00
nodeIndexCreatedAction . add ( nodeIndexCreatedListener ) ;
2011-04-27 00:24:27 +03:00
2011-08-03 14:32:33 +03:00
listener . future = threadPool . schedule ( request . timeout , ThreadPool . Names . SAME , new Runnable ( ) {
2011-12-06 02:42:25 +02:00
@Override
public void run ( ) {
2011-08-03 14:32:33 +03:00
listener . onResponse ( new Response ( false , indexMetaData ) ) ;
nodeIndexCreatedAction . remove ( nodeIndexCreatedListener ) ;
}
} ) ;
2011-04-27 00:24:27 +03:00
2011-04-17 18:39:00 +03:00
return updatedState ;
2012-11-26 10:15:09 +01:00
} catch ( Throwable e ) {
2011-01-21 01:57:16 +02:00
logger . warn ( " [{}] failed to create " , e , request . index ) ;
2013-01-30 10:39:38 -05:00
if ( indexCreated ) {
// Index was already partially created - need to clean up
2013-03-04 20:41:19 -08:00
indicesService . removeIndex ( request . index , failureReason ! = null ? failureReason : " failed to create index " ) ;
2013-01-30 10:39:38 -05:00
}
2010-07-13 18:01:57 +03:00
listener . onFailure ( e ) ;
return currentState ;
}
}
2011-12-06 02:42:25 +02:00
@Override
public void clusterStateProcessed ( ClusterState clusterState ) {
2010-07-13 18:01:57 +03:00
}
2010-10-19 19:33:27 +02:00
} ) ;
}
2010-07-13 18:01:57 +03:00
2011-04-27 00:24:27 +03:00
class CreateIndexListener implements Listener {
2011-09-02 09:36:25 +03:00
private final AtomicBoolean notified = new AtomicBoolean ( ) ;
private final MetaDataService . MdLock mdLock ;
2011-04-27 00:24:27 +03:00
private final Request request ;
private final Listener listener ;
volatile ScheduledFuture future ;
2011-09-02 09:36:25 +03:00
private CreateIndexListener ( MetaDataService . MdLock mdLock , Request request , Listener listener ) {
this . mdLock = mdLock ;
2011-04-27 00:24:27 +03:00
this . request = request ;
this . listener = listener ;
}
2011-12-06 02:42:25 +02:00
@Override
public void onResponse ( final Response response ) {
2011-04-27 00:24:27 +03:00
if ( notified . compareAndSet ( false , true ) ) {
2011-09-02 09:36:25 +03:00
mdLock . unlock ( ) ;
2011-04-27 00:24:27 +03:00
if ( future ! = null ) {
future . cancel ( false ) ;
}
listener . onResponse ( response ) ;
}
}
2011-12-06 02:42:25 +02:00
@Override
public void onFailure ( Throwable t ) {
2011-04-27 00:24:27 +03:00
if ( notified . compareAndSet ( false , true ) ) {
2011-09-02 09:36:25 +03:00
mdLock . unlock ( ) ;
2011-04-27 00:24:27 +03:00
if ( future ! = null ) {
future . cancel ( false ) ;
}
listener . onFailure ( t ) ;
}
}
}
2010-11-26 15:45:18 +02:00
private Map < String , Object > parseMapping ( String mappingSource ) throws Exception {
return XContentFactory . xContent ( mappingSource ) . createParser ( mappingSource ) . mapAndClose ( ) ;
}
private void addMappings ( Map < String , Map < String , Object > > mappings , File mappingsDir ) {
2010-10-19 19:33:27 +02:00
File [ ] mappingsFiles = mappingsDir . listFiles ( ) ;
for ( File mappingFile : mappingsFiles ) {
2011-05-26 15:45:09 -05:00
if ( mappingFile . isHidden ( ) ) {
continue ;
}
2012-07-24 13:07:30 -04:00
int lastDotIndex = mappingFile . getName ( ) . lastIndexOf ( '.' ) ;
String mappingType = lastDotIndex ! = - 1 ? mappingFile . getName ( ) . substring ( 0 , lastDotIndex ) : mappingFile . getName ( ) ;
2010-10-19 19:33:27 +02:00
try {
2010-11-26 15:45:18 +02:00
String mappingSource = Streams . copyToString ( new FileReader ( mappingFile ) ) ;
if ( mappings . containsKey ( mappingType ) ) {
XContentHelper . mergeDefaults ( mappings . get ( mappingType ) , parseMapping ( mappingSource ) ) ;
} else {
mappings . put ( mappingType , parseMapping ( mappingSource ) ) ;
}
} catch ( Exception e ) {
logger . warn ( " failed to read / parse mapping [ " + mappingType + " ] from location [ " + mappingFile + " ], ignoring... " , e ) ;
2010-07-13 18:01:57 +03:00
}
}
}
2010-11-26 15:45:18 +02:00
private List < IndexTemplateMetaData > findTemplates ( Request request , ClusterState state ) {
List < IndexTemplateMetaData > templates = Lists . newArrayList ( ) ;
for ( IndexTemplateMetaData template : state . metaData ( ) . templates ( ) . values ( ) ) {
if ( Regex . simpleMatch ( template . template ( ) , request . index ) ) {
templates . add ( template ) ;
}
}
2012-02-15 11:45:58 +02:00
// see if we have templates defined under config
File templatesDir = new File ( environment . configFile ( ) , " templates " ) ;
if ( templatesDir . exists ( ) & & templatesDir . isDirectory ( ) ) {
File [ ] templatesFiles = templatesDir . listFiles ( ) ;
if ( templatesFiles ! = null ) {
for ( File templatesFile : templatesFiles ) {
XContentParser parser = null ;
try {
byte [ ] templatesData = Streams . copyToByteArray ( templatesFile ) ;
parser = XContentHelper . createParser ( templatesData , 0 , templatesData . length ) ;
IndexTemplateMetaData template = IndexTemplateMetaData . Builder . fromXContentStandalone ( parser ) ;
if ( Regex . simpleMatch ( template . template ( ) , request . index ) ) {
templates . add ( template ) ;
}
} catch ( Exception e ) {
2012-05-17 00:17:31 +03:00
logger . warn ( " [{}] failed to read template [{}] from config " , e , request . index , templatesFile . getAbsolutePath ( ) ) ;
2012-02-15 11:45:58 +02:00
} finally {
Closeables . closeQuietly ( parser ) ;
}
}
}
}
2010-11-26 15:45:18 +02:00
Collections . sort ( templates , new Comparator < IndexTemplateMetaData > ( ) {
2011-12-06 02:42:25 +02:00
@Override
public int compare ( IndexTemplateMetaData o1 , IndexTemplateMetaData o2 ) {
2010-11-26 15:45:18 +02:00
return o2 . order ( ) - o1 . order ( ) ;
}
} ) ;
return templates ;
}
private void validate ( Request request , ClusterState state ) throws ElasticSearchException {
if ( state . routingTable ( ) . hasIndex ( request . index ) ) {
throw new IndexAlreadyExistsException ( new Index ( request . index ) ) ;
}
if ( state . metaData ( ) . hasIndex ( request . index ) ) {
throw new IndexAlreadyExistsException ( new Index ( request . index ) ) ;
}
if ( request . index . contains ( " " ) ) {
throw new InvalidIndexNameException ( new Index ( request . index ) , request . index , " must not contain whitespace " ) ;
}
if ( request . index . contains ( " , " ) ) {
throw new InvalidIndexNameException ( new Index ( request . index ) , request . index , " must not contain ', " ) ;
}
if ( request . index . contains ( " # " ) ) {
throw new InvalidIndexNameException ( new Index ( request . index ) , request . index , " must not contain '# " ) ;
}
2011-01-13 16:20:31 +02:00
if ( ! request . index . equals ( riverIndexName ) & & ! request . index . equals ( PercolatorService . INDEX_NAME ) & & request . index . charAt ( 0 ) = = '_' ) {
2010-11-26 15:45:18 +02:00
throw new InvalidIndexNameException ( new Index ( request . index ) , request . index , " must not start with '_' " ) ;
}
if ( ! request . index . toLowerCase ( ) . equals ( request . index ) ) {
throw new InvalidIndexNameException ( new Index ( request . index ) , request . index , " must be lowercase " ) ;
}
if ( ! Strings . validFileName ( request . index ) ) {
throw new InvalidIndexNameException ( new Index ( request . index ) , request . index , " must not contain the following characters " + Strings . INVALID_FILENAME_CHARS ) ;
}
2011-06-15 08:02:12 -04:00
if ( state . metaData ( ) . aliases ( ) . containsKey ( request . index ) ) {
2010-11-26 15:45:18 +02:00
throw new InvalidIndexNameException ( new Index ( request . index ) , request . index , " an alias with the same name already exists " ) ;
}
}
2010-07-13 18:01:57 +03:00
public static interface Listener {
void onResponse ( Response response ) ;
void onFailure ( Throwable t ) ;
}
public static class Request {
final String cause ;
final String index ;
2010-10-23 21:52:09 +02:00
State state = State . OPEN ;
2010-07-13 18:01:57 +03:00
Settings settings = ImmutableSettings . Builder . EMPTY_SETTINGS ;
Map < String , String > mappings = Maps . newHashMap ( ) ;
2012-05-07 14:27:30 +03:00
Map < String , IndexMetaData . Custom > customs = newHashMap ( ) ;
2010-07-13 18:01:57 +03:00
TimeValue timeout = TimeValue . timeValueSeconds ( 5 ) ;
2010-08-29 01:24:23 +03:00
Set < ClusterBlock > blocks = Sets . newHashSet ( ) ;
2011-04-17 18:39:00 +03:00
public Request ( String cause , String index ) {
2010-07-13 18:01:57 +03:00
this . cause = cause ;
this . index = index ;
}
public Request settings ( Settings settings ) {
this . settings = settings ;
return this ;
}
public Request mappings ( Map < String , String > mappings ) {
this . mappings . putAll ( mappings ) ;
return this ;
}
2010-11-15 21:21:18 +02:00
public Request mappingsMetaData ( Map < String , MappingMetaData > mappings ) throws IOException {
for ( Map . Entry < String , MappingMetaData > entry : mappings . entrySet ( ) ) {
this . mappings . put ( entry . getKey ( ) , entry . getValue ( ) . source ( ) . string ( ) ) ;
}
return this ;
}
2010-08-19 17:06:36 +03:00
public Request mappingsCompressed ( Map < String , CompressedString > mappings ) throws IOException {
for ( Map . Entry < String , CompressedString > entry : mappings . entrySet ( ) ) {
this . mappings . put ( entry . getKey ( ) , entry . getValue ( ) . string ( ) ) ;
}
return this ;
}
2012-05-07 14:27:30 +03:00
public Request customs ( Map < String , Custom > customs ) {
this . customs . putAll ( customs ) ;
return this ;
}
2010-08-29 01:24:23 +03:00
public Request blocks ( Set < ClusterBlock > blocks ) {
this . blocks . addAll ( blocks ) ;
return this ;
}
2010-10-23 21:52:09 +02:00
public Request state ( State state ) {
this . state = state ;
return this ;
}
2010-07-13 18:01:57 +03:00
public Request timeout ( TimeValue timeout ) {
this . timeout = timeout ;
return this ;
}
}
public static class Response {
private final boolean acknowledged ;
2010-08-20 00:45:17 +03:00
private final IndexMetaData indexMetaData ;
2010-07-13 18:01:57 +03:00
2010-08-20 00:45:17 +03:00
public Response ( boolean acknowledged , IndexMetaData indexMetaData ) {
2010-07-13 18:01:57 +03:00
this . acknowledged = acknowledged ;
2010-08-20 00:45:17 +03:00
this . indexMetaData = indexMetaData ;
2010-07-13 18:01:57 +03:00
}
public boolean acknowledged ( ) {
return acknowledged ;
}
2010-08-20 00:45:17 +03:00
public IndexMetaData indexMetaData ( ) {
return indexMetaData ;
}
2010-07-13 18:01:57 +03:00
}
}