NIFI-3274 Adding WriteAheadLog configuration options to WriteAheadLogLocalStateProvider

This closes #1386.
This commit is contained in:
jpercivall 2017-01-03 18:13:54 -05:00 committed by Pierre Villard
parent 68057cb4af
commit 273e69f2cb
3 changed files with 58 additions and 8 deletions

View File

@ -54,6 +54,8 @@ import org.wali.WriteAheadRepository;
public class WriteAheadLocalStateProvider extends AbstractStateProvider { public class WriteAheadLocalStateProvider extends AbstractStateProvider {
private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class); private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
private volatile boolean alwaysSync;
private final StateMapSerDe serde; private final StateMapSerDe serde;
private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory()); private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory());
@ -66,6 +68,33 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor ALWAYS_SYNC = new PropertyDescriptor.Builder()
.name("Always Sync")
.description("If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very " +
"expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the " +
"operating system crashes. The default value is false.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
.name("Partitions")
.description("The number of partitions.")
.addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true))
.defaultValue("16")
.required(true)
.build();
static final PropertyDescriptor CHECKPOINT_INTERVAL = new PropertyDescriptor.Builder()
.name("Checkpoint Interval")
.description("The amount of time between checkpoints.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("2 mins")
.required(true)
.build();
private WriteAheadRepository<StateMapUpdate> writeAheadLog; private WriteAheadRepository<StateMapUpdate> writeAheadLog;
private AtomicLong versionGenerator; private AtomicLong versionGenerator;
@ -75,6 +104,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
@Override @Override
public synchronized void init(final StateProviderInitializationContext context) throws IOException { public synchronized void init(final StateProviderInitializationContext context) throws IOException {
long checkpointIntervalMillis = context.getProperty(CHECKPOINT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
int numPartitions = context.getProperty(NUM_PARTITIONS).asInteger();
alwaysSync = context.getProperty(ALWAYS_SYNC).asBoolean();
final File basePath = new File(context.getProperty(PATH).getValue()); final File basePath = new File(context.getProperty(PATH).getValue());
if (!basePath.exists() && !basePath.mkdirs()) { if (!basePath.exists() && !basePath.mkdirs()) {
@ -94,7 +128,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
} }
versionGenerator = new AtomicLong(-1L); versionGenerator = new AtomicLong(-1L);
writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 16, serde, null); writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), numPartitions, serde, null);
final Collection<StateMapUpdate> updates = writeAheadLog.recoverRecords(); final Collection<StateMapUpdate> updates = writeAheadLog.recoverRecords();
long maxRecordVersion = -1L; long maxRecordVersion = -1L;
@ -110,7 +144,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
} }
final String componentId = update.getComponentId(); final String componentId = update.getComponentId();
componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap())); componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap(), alwaysSync));
} }
// keep a separate maxRecordVersion and set it at the end so that we don't have to continually update an AtomicLong, which is more // keep a separate maxRecordVersion and set it at the end so that we don't have to continually update an AtomicLong, which is more
@ -118,13 +152,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
// the init() method completes, this is okay to do. // the init() method completes, this is okay to do.
versionGenerator.set(maxRecordVersion); versionGenerator.set(maxRecordVersion);
executor.scheduleWithFixedDelay(new CheckpointTask(), 2, 2, TimeUnit.MINUTES); executor.scheduleWithFixedDelay(new CheckpointTask(), checkpointIntervalMillis, checkpointIntervalMillis, TimeUnit.MILLISECONDS);
} }
@Override @Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PATH); properties.add(PATH);
properties.add(ALWAYS_SYNC);
properties.add(CHECKPOINT_INTERVAL);
properties.add(NUM_PARTITIONS);
return properties; return properties;
} }
@ -144,7 +181,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
ComponentProvider componentProvider = componentProviders.get(componentId); ComponentProvider componentProvider = componentProviders.get(componentId);
if (componentProvider == null) { if (componentProvider == null) {
final StateMap stateMap = new StandardStateMap(Collections.<String, String> emptyMap(), -1L); final StateMap stateMap = new StandardStateMap(Collections.<String, String> emptyMap(), -1L);
componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap); componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap, alwaysSync);
final ComponentProvider existingComponentProvider = componentProviders.putIfAbsent(componentId, componentProvider); final ComponentProvider existingComponentProvider = componentProviders.putIfAbsent(componentId, componentProvider);
if (existingComponentProvider != null) { if (existingComponentProvider != null) {
@ -190,14 +227,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
private final AtomicLong versionGenerator; private final AtomicLong versionGenerator;
private final WriteAheadRepository<StateMapUpdate> wal; private final WriteAheadRepository<StateMapUpdate> wal;
private final String componentId; private final String componentId;
private final boolean alwaysSync;
private StateMap stateMap; private StateMap stateMap;
public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap) { public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap, final boolean alwaysSync) {
this.wal = wal; this.wal = wal;
this.versionGenerator = versionGenerator; this.versionGenerator = versionGenerator;
this.componentId = componentId; this.componentId = componentId;
this.stateMap = stateMap; this.stateMap = stateMap;
this.alwaysSync = alwaysSync;
} }
public synchronized StateMap getState() throws IOException { public synchronized StateMap getState() throws IOException {
@ -211,7 +250,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
public synchronized void setState(final Map<String, String> state) throws IOException { public synchronized void setState(final Map<String, String> state) throws IOException {
stateMap = new StandardStateMap(state, versionGenerator.incrementAndGet()); stateMap = new StandardStateMap(state, versionGenerator.incrementAndGet());
final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
wal.update(Collections.singleton(updateRecord), false); wal.update(Collections.singleton(updateRecord), alwaysSync);
} }
// see above explanation as to why this method is synchronized. // see above explanation as to why this method is synchronized.
@ -227,14 +266,14 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
stateMap = new StandardStateMap(new HashMap<>(newValue), versionGenerator.incrementAndGet()); stateMap = new StandardStateMap(new HashMap<>(newValue), versionGenerator.incrementAndGet());
final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
wal.update(Collections.singleton(updateRecord), false); wal.update(Collections.singleton(updateRecord), alwaysSync);
return true; return true;
} }
public synchronized void clear() throws IOException { public synchronized void clear() throws IOException {
stateMap = new StandardStateMap(null, versionGenerator.incrementAndGet()); stateMap = new StandardStateMap(null, versionGenerator.incrementAndGet());
final StateMapUpdate update = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); final StateMapUpdate update = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
wal.update(Collections.singleton(update), false); wal.update(Collections.singleton(update), alwaysSync);
} }
} }

View File

@ -45,6 +45,9 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider
provider = new WriteAheadLocalStateProvider(); provider = new WriteAheadLocalStateProvider();
final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>(); final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>();
properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null)); properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null));
properties.put(WriteAheadLocalStateProvider.ALWAYS_SYNC, new StandardPropertyValue("false", null));
properties.put(WriteAheadLocalStateProvider.CHECKPOINT_INTERVAL, new StandardPropertyValue("2 mins", null));
properties.put(WriteAheadLocalStateProvider.NUM_PARTITIONS, new StandardPropertyValue("16", null));
provider.initialize(new StateProviderInitializationContext() { provider.initialize(new StateProviderInitializationContext() {
@Override @Override

View File

@ -25,11 +25,19 @@
Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it
is important that the directory be copied over to the new version when upgrading NiFi. is important that the directory be copied over to the new version when upgrading NiFi.
Always Sync - If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very
expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the
operating system crashes. The default value is false.
Partitions - The number of partitions.
Checkpoint Interval - The amount of time between checkpoints.
--> -->
<local-provider> <local-provider>
<id>local-provider</id> <id>local-provider</id>
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class> <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
<property name="Directory">./state/local</property> <property name="Directory">./state/local</property>
<property name="Always Sync">false</property>
<property name="Partitions">16</property>
<property name="Checkpoint Interval">2 mins</property>
</local-provider> </local-provider>
<!-- <!--