HBASE-11302 ReplicationSourceManager#sources is not thread safe (Qianxi Zhang)

This commit is contained in:
Ted Yu 2014-06-06 16:57:57 +00:00
parent 623cfa33d1
commit b5e660fff4
1 changed files with 5 additions and 2 deletions

View File

@ -34,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -113,13 +114,15 @@ public class ReplicationSourceManager implements ReplicationListener {
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) { final Path oldLogDir, final UUID clusterId) {
this.sources = new ArrayList<ReplicationSourceInterface>(); //CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.replicationQueues = replicationQueues; this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers; this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker; this.replicationTracker = replicationTracker;
this.stopper = stopper; this.stopper = stopper;
this.hlogsById = new HashMap<String, SortedSet<String>>(); this.hlogsById = new HashMap<String, SortedSet<String>>();
this.oldsources = new ArrayList<ReplicationSourceInterface>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.conf = conf; this.conf = conf;
this.fs = fs; this.fs = fs;
this.logDir = logDir; this.logDir = logDir;