OPENJPA-2030: First version of Audit facility

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@1161349 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Pinaki Poddar 2011-08-25 01:06:39 +00:00
parent a35c187d8f
commit 8996376462
6 changed files with 539 additions and 21 deletions

View File

@ -21,6 +21,8 @@ package org.apache.openjpa.conf;
import java.util.Collection;
import java.util.Map;
import org.apache.openjpa.kernel.AuditManager;
import org.apache.openjpa.audit.Auditor;
import org.apache.openjpa.datacache.CacheDistributionPolicy;
import org.apache.openjpa.datacache.DataCache;
import org.apache.openjpa.datacache.DataCacheManager;
@ -1863,5 +1865,20 @@ public interface OpenJPAConfiguration
*/
public InstrumentationManager getInstrumentationManagerInstance();
/**
* Gets an instance of {@link AuditManager} associated with this configuration.
*
* @since 2.2.0
*/
public Auditor getAuditorInstance();
/**
* Gets the plug-in string of {@link AuditManager} specified in this configuration.
*
* @since 2.2.0
*/
public String getAuditor();
}

View File

@ -23,6 +23,8 @@ import java.util.HashSet;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.openjpa.audit.AuditLogger;
import org.apache.openjpa.audit.Auditor;
import org.apache.openjpa.datacache.CacheDistributionPolicy;
import org.apache.openjpa.datacache.ConcurrentDataCache;
import org.apache.openjpa.datacache.ConcurrentQueryCache;
@ -98,6 +100,7 @@ public class OpenJPAConfigurationImpl
public BrokerValue brokerPlugin;
public ObjectValue dataCachePlugin;
public ObjectValue dataCacheManagerPlugin;
public ObjectValue auditorPlugin;
public ObjectValue cacheDistributionPolicyPlugin;
public IntValue dataCacheTimeout;
public ObjectValue queryCachePlugin;
@ -588,6 +591,13 @@ public class OpenJPAConfigurationImpl
instrumentationProviders.setAliases(aliases);
instrumentationProviders.setInstantiatingGetter("getInstrumentationInstances");
auditorPlugin = addPlugin("Auditor", true);
aliases = new String[] { "default", AuditLogger.class.getName(), };
auditorPlugin.setAliases(aliases);
auditorPlugin.setDefault(aliases[0]);
auditorPlugin.setString(aliases[0]);
auditorPlugin.setInstantiatingGetter("getAuditor");
// initialize supported options that some runtimes may not support
supportedOptions.add(OPTION_NONTRANS_READ);
supportedOptions.add(OPTION_OPTIMISTIC);
@ -1807,5 +1817,17 @@ public class OpenJPAConfigurationImpl
public Map<String, Object> getPersistenceEnvironment() {
return _peMap;
}
public Auditor getAuditorInstance() {
Auditor auditor = (Auditor) auditorPlugin.get();
if (auditor == null) {
auditor = (Auditor) auditorPlugin.instantiate(Auditor.class, this);
}
return auditor;
}
public String getAuditor() {
return auditorPlugin.getString();
}
}

View File

@ -33,7 +33,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.naming.NamingException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
@ -41,9 +40,9 @@ import javax.transaction.TransactionManager;
import org.apache.commons.collections.set.MapBackedSet;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.NestableRuntimeException;
import org.apache.openjpa.kernel.AuditManager;
import org.apache.openjpa.audit.Auditor;
import org.apache.openjpa.conf.BrokerValue;
import org.apache.openjpa.conf.MetaDataRepositoryValue;
import org.apache.openjpa.conf.OpenJPAConfiguration;
import org.apache.openjpa.conf.OpenJPAConfigurationImpl;
import org.apache.openjpa.conf.OpenJPAVersion;
@ -67,7 +66,6 @@ import org.apache.openjpa.meta.MetaDataModes;
import org.apache.openjpa.meta.MetaDataRepository;
import org.apache.openjpa.util.GeneralException;
import org.apache.openjpa.util.InvalidStateException;
import org.apache.openjpa.util.MetaDataException;
import org.apache.openjpa.util.OpenJPAException;
import org.apache.openjpa.util.UserException;
@ -486,8 +484,7 @@ public abstract class AbstractBrokerFactory
else
bv = (BrokerValue) _conf.getValue(BrokerValue.KEY);
if (FinalizingBrokerImpl.class.isAssignableFrom(
bv.getTemplateBrokerType(_conf))) {
if (FinalizingBrokerImpl.class.isAssignableFrom(bv.getTemplateBrokerType(_conf))) {
return MapBackedSet.decorate(new ConcurrentHashMap(), new Object() { });
} else {
return new ConcurrentReferenceHashSet<Broker>(ConcurrentReferenceHashSet.WEAK);
@ -854,6 +851,13 @@ public abstract class AbstractBrokerFactory
* This method is invoked AFTER a BrokerFactory has been instantiated.
*/
public void postCreationCallback() {
Auditor auditor = _conf.getAuditorInstance();
if (auditor != null) {
AuditManager auditManager = new AuditManager();
auditManager.setAuditor(auditor);
addLifecycleListener(auditManager, null);
addTransactionListener(auditManager);
}
if (_conf.isInitializeEagerly()) {
newBroker(_conf.getConnectionUserName(), _conf.getConnectionPassword(),
_conf.isConnectionFactoryModeManaged(), _conf.getConnectionRetainModeConstant(), false).close();

View File

@ -0,0 +1,385 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.kernel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.openjpa.audit.Auditable;
import org.apache.openjpa.audit.AuditableOperation;
import org.apache.openjpa.audit.Auditor;
import org.apache.openjpa.enhance.PCRegistry;
import org.apache.openjpa.enhance.PCRegistry.RegisterClassListener;
import org.apache.openjpa.enhance.PersistenceCapable;
import org.apache.openjpa.enhance.StateManager;
import org.apache.openjpa.event.LifecycleEvent;
import org.apache.openjpa.event.LifecycleListener;
import org.apache.openjpa.event.TransactionEvent;
import org.apache.openjpa.event.TransactionListener;
/**
* Controller for audit facility.
* This controller performs the following basic duties:
* <LI> Records auditable types at class laoding time
* <LI> Listens to instance life cycle changes and transaction.
* <LI> Collects auditable instances on instance life cycle changes.
* <LI> Delegates real auditing to the {@link Auditor} at transaction boundary.
*
* @author Pinaki Poddar
*
*/
public class AuditManager extends InMemorySavepointManager
implements LifecycleListener, TransactionListener, RegisterClassListener {
private Auditor _auditor;
private final Set<Class<?>> _allTypes;
private final Set<Class<?>> _newTypes;
private final Set<Class<?>> _updateTypes;
private final Set<Class<?>> _deleteTypes;
private final Map<Broker, Set<Audited>> _saved;
private final ReentrantLock _lock = new ReentrantLock();
public AuditManager() {
super();
setPreFlush(false);
_allTypes = new HashSet<Class<?>>();
_newTypes = new HashSet<Class<?>>();
_updateTypes = new HashSet<Class<?>>();
_deleteTypes = new HashSet<Class<?>>();
_saved = new ConcurrentHashMap<Broker, Set<Audited>>();
PCRegistry.addRegisterClassListener(this);
}
/**
* Records all auditable classes in operation-specific sets.
*/
@Override
public void register(Class<?> cls) {
Auditable auditable = cls.getAnnotation(Auditable.class);
if (auditable == null) {
return;
}
List<AuditableOperation> events = Arrays.asList(auditable.values());
if (events.contains(AuditableOperation.ALL) || events.contains(AuditableOperation.CREATE)) {
_newTypes.add(cls);
_allTypes.add(cls);
}
if (events.contains(AuditableOperation.ALL) || events.contains(AuditableOperation.UPDATE)) {
_updateTypes.add(cls);
_allTypes.add(cls);
}
if (events.contains(AuditableOperation.ALL) || events.contains(AuditableOperation.DELETE)) {
_deleteTypes.add(cls);
_allTypes.add(cls);
}
}
public void setAuditor(Auditor auditor) {
_auditor = auditor;
}
public Auditor getAuditor() {
return _auditor;
}
public Set<Class<?>> getAuditedTypes() {
return Collections.unmodifiableSet(_allTypes);
}
/**
* Transaction callbacks.
*/
@Override
public void beforeCommit(TransactionEvent event) {
if (_auditor == null) return;
_lock.lock();
try {
Broker broker = (Broker)event.getSource();
Set<Audited> audits = _saved.get(broker);
if (audits == null) return;
Collection<Audited> news = new HashSet<Audited>();
Collection<Audited> updates = new HashSet<Audited>();
Collection<Audited> deletes = new HashSet<Audited>();
Collection<?> instances = event.getTransactionalObjects();
for (Object instance : instances) {
StateManagerImpl sm = getImpl(instance);
if (sm != null) {
Audited audited = search(audits, sm);
if (audited == null) {
continue;
}
if (sm.getPCState().isNew()) {
news.add(audited);
} else if (sm.getPCState().isDeleted()) {
deletes.add(audited);
} else if (sm.getPCState().isDirty()) {
updates.add(audited);
}
}
}
try {
_auditor.audit(broker, news, updates, deletes);
} catch (Exception e) {
if (_auditor.isRollbackOnError()) {
throw new RuntimeException("dump", e);
} else {
e.printStackTrace();
}
}
} finally {
_lock.unlock();
}
}
@Override
public void afterCommit(TransactionEvent event) {
_saved.remove(event.getSource());
}
@Override
public void afterRollback(TransactionEvent event) {
_saved.remove(event.getSource());
}
@Override
public void afterBegin(TransactionEvent event) {
}
@Override
public void beforeFlush(TransactionEvent event) {
}
@Override
public void afterFlush(TransactionEvent event) {
}
@Override
public void afterStateTransitions(TransactionEvent event) {
}
@Override
public void afterCommitComplete(TransactionEvent event) {
}
@Override
public void afterRollbackComplete(TransactionEvent event) {
}
/**
* Life-cycle callbacks
*/
@Override
public void afterLoad(LifecycleEvent event) {
save(AuditableOperation.ALL, event);
}
@Override
public void afterPersist(LifecycleEvent event) {
save(AuditableOperation.CREATE, event);
}
@Override
public void beforeDelete(LifecycleEvent event) {
save(AuditableOperation.DELETE, event);
}
@Override
public void beforeDirty(LifecycleEvent event) {
if (!isAudited(event)) {
save(AuditableOperation.UPDATE, event);
}
}
@Override
public void beforePersist(LifecycleEvent event) {
}
@Override
public void afterRefresh(LifecycleEvent event) {
}
@Override
public void beforeStore(LifecycleEvent event) {
}
@Override
public void afterStore(LifecycleEvent event) {
}
@Override
public void beforeClear(LifecycleEvent event) {
}
@Override
public void afterClear(LifecycleEvent event) {
}
@Override
public void afterDelete(LifecycleEvent event) {
}
@Override
public void afterDirty(LifecycleEvent event) {
}
@Override
public void beforeDirtyFlushed(LifecycleEvent event) {
}
@Override
public void afterDirtyFlushed(LifecycleEvent event) {
}
@Override
public void beforeDetach(LifecycleEvent event) {
}
@Override
public void afterDetach(LifecycleEvent event) {
}
@Override
public void beforeAttach(LifecycleEvent event) {
}
@Override
public void afterAttach(LifecycleEvent event) {
}
/**
* Support functions.
*/
/**
* Extracts the persistence capable instance from the source of the given event.
* @return null if an instance can not be extracted.
*/
protected PersistenceCapable getPersistenceCapable(LifecycleEvent evt) {
Object source = evt.getSource();
return source instanceof PersistenceCapable ? (PersistenceCapable)source : null;
}
/**
* Affirms if source of the given event is being audited already.
*/
protected boolean isAudited(LifecycleEvent event) {
StateManagerImpl sm = getImpl(event.getSource());
if (_saved.containsKey(sm.getBroker())) {
return search(_saved.get(sm.getBroker()), sm) != null;
}
return false;
}
/**
* Extracts the broker from the given persistence capable instance.
* @param pc a persistence capable instance
* @return null if a Broker can notbe extracted
*/
protected Broker getBroker(PersistenceCapable pc) {
if (pc == null) return null;
Object ctx = pc.pcGetGenericContext();
return ctx instanceof Broker ? (Broker)ctx : null;
}
/**
* Saves the source of the given event for auditing.
* @param lc
* @param event
*/
protected void save(AuditableOperation lc, LifecycleEvent event) {
StateManagerImpl sm = getImpl(event.getSource());
if (sm != null && isAuditable(lc, sm)) {
Broker broker = sm.getBroker();
OpenJPASavepoint savepoint = newSavepoint("", broker);
savepoint.save(Collections.singleton(sm));
Map<StateManagerImpl, SavepointFieldManager> states = savepoint.getStates();
Map.Entry<StateManagerImpl, SavepointFieldManager> e = states.entrySet().iterator().next();
PersistenceCapable copy = e.getValue().getCopy();
copy.pcReplaceStateManager(null);
Audited audited = new Audited(sm, copy);
Set<Audited> audits = _saved.get(broker);
if (audits == null) {
audits = new HashSet<Audited>();
_saved.put(broker, audits);
}
audits.add(audited);
}
}
/**
* Searches the set of Auditable instances for a matching Statemanager.
* @param audits
* @param sm
* @return
*/
private Audited search(Set<Audited> audits, StateManagerImpl sm) {
for (Audited audit : audits) {
if (audit.getManagedObject() == sm.getPersistenceCapable()) {
return audit;
}
}
return null;
}
/**
* Gets an implementation.
* @param instance
* @return
*/
private StateManagerImpl getImpl(Object instance) {
if (instance instanceof PersistenceCapable) {
StateManager sm = ((PersistenceCapable)instance).pcGetStateManager();
if (sm instanceof StateManagerImpl) {
return (StateManagerImpl)sm;
} else {
return null;
}
} else {
return null;
}
}
/**
* Affirms if the given state manager is auditable for the given operation.
* @param op an auditable operation
* @param sm
* @return
*/
protected boolean isAuditable(AuditableOperation op, StateManagerImpl sm) {
if (sm == null)
return false;
Class<?> cls = sm.getMetaData().getDescribedType();
return (op == AuditableOperation.ALL && _allTypes.contains(cls)
|| op == AuditableOperation.CREATE && _newTypes.contains(cls))
||(op == AuditableOperation.UPDATE && _updateTypes.contains(cls))
||(op == AuditableOperation.DELETE && _deleteTypes.contains(cls));
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.kernel;
import java.util.BitSet;
import org.apache.openjpa.audit.AuditableOperation;
import org.apache.openjpa.enhance.PersistenceCapable;
/**
* Carries immutable information about an audited persistent instance.
*
* @author Pinaki Poddar
*
*/
public final class Audited {
private final StateManagerImpl _sm;
private final PersistenceCapable _original;
/**
* Supply a state manager and a transient copy.
* @param sm a state manager, must not be null.
* @param o the transient copy
*/
Audited(StateManagerImpl sm, PersistenceCapable o) {
if (sm == null || o == null)
throw new NullPointerException("sm: " + sm + " original: " + o);
if (o.pcGetStateManager() != null)
throw new IllegalArgumentException(o + " is not transient");
_sm = sm;
_original = o;
}
/**
* Gets the current state of the persistent instance.
*/
public Object getManagedObject() {
return _sm.getManagedInstance();
}
/**
* Gets the original state of the persistent instance as a transient instance.
*/
public Object getOriginalObject() {
return _original;
}
/**
* Gets the name of the updated fields.
*
* @return persistent property names that are modified.
* For deleted instances the array is empty and for newly created instances
* the array contains all the fields.
*/
public String[] getUpdatedFields() {
BitSet dirty = _sm.getDirty();
String[] names = new String[dirty.cardinality()];
int j = 0;
for (int i = 0; i < dirty.size(); i++) {
if (dirty.get(i)) {
names[j++] = _sm.getMetaData().getField(i).getName();
}
}
return names;
}
/**
* Gets the type of this audit.
*/
public AuditableOperation getType() {
PCState state = _sm.getPCState();
if (state.isNew()) return AuditableOperation.CREATE;
if (state.isDeleted()) return AuditableOperation.DELETE;
if (state.isDirty()) return AuditableOperation.UPDATE;
return null; // should not happen
}
}

View File

@ -21,7 +21,6 @@ package org.apache.openjpa.kernel;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
@ -31,14 +30,14 @@ import java.util.Map;
* @author Steve Kim
* @since 0.3.4
*/
@SuppressWarnings("serial")
public class OpenJPASavepoint implements Serializable {
private final Broker _broker;
private final String _name;
private final boolean _copy;
// <StateManagerImpl, SavepointFieldManager>
private Map _saved;
private Map<StateManagerImpl, SavepointFieldManager> _saved;
/**
* Constructor. Indicate whether to copy field data into memory.
@ -73,7 +72,7 @@ public class OpenJPASavepoint implements Serializable {
/**
* Return the map of states to savepoint data.
*/
protected Map getStates() {
protected Map<StateManagerImpl, SavepointFieldManager> getStates() {
return _saved;
}
@ -81,14 +80,12 @@ public class OpenJPASavepoint implements Serializable {
* Set this savepoint, saving any state for the passed-in
* {@link OpenJPAStateManager}s as necessary.
*/
public void save(Collection states) {
public void save(Collection<StateManagerImpl> states) {
if (_saved != null)
throw new IllegalStateException();
_saved = new HashMap((int) (states.size() * 1.33 + 1));
StateManagerImpl sm;
for (Iterator i = states.iterator(); i.hasNext();) {
sm = (StateManagerImpl) i.next();
_saved = new HashMap<StateManagerImpl, SavepointFieldManager>((int) (states.size() * 1.33 + 1));
for (StateManagerImpl sm : states) {
_saved.put(sm, new SavepointFieldManager(sm, _copy));
}
}
@ -110,16 +107,16 @@ public class OpenJPASavepoint implements Serializable {
*
* @param previous previous savepoints set in the transaction
*/
public Collection rollback(Collection previous) {
Map saved;
public Collection<SavepointFieldManager> rollback(Collection<OpenJPASavepoint> previous) {
Map<StateManagerImpl, SavepointFieldManager> saved;
if (previous.isEmpty())
saved = _saved;
else {
// merge all changes into one collection, allowing for later
// SavepointFieldManagers to replace previous ones.
saved = new HashMap();
for (Iterator i = previous.iterator(); i.hasNext();)
saved.putAll(((OpenJPASavepoint) i.next()).getStates());
saved = new HashMap<StateManagerImpl, SavepointFieldManager>();
for (OpenJPASavepoint savepoint : previous)
saved.putAll(savepoint.getStates());
saved.putAll(_saved);
}
_saved = null;