1
package basic.util.event;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
public class Observable<Listener> {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Listener> listeners = new ArrayList<Listener>();
public Listener register(Listener listener) {
this.writeLock.lock();
try {
this.listeners.add(listener);
} finally {
this.writeLock.unlock();
}
return listener;
}
public void unregister(Listener listener) {
this.writeLock.lock();
try {
this.listeners.remove(listener);
} finally {
this.writeLock.unlock();
}
}
public void notifyListener(Consumer<? super Listener> consumer) {
this.readLock.lock();
try {
this.listeners.forEach(consumer);
} finally {
this.readLock.unlock();
}
}
public void syncNotifyListener(Consumer<? super Listener> consumer) {
this.readLock.lock();
ExecutorService executorService = Executors.newCachedThreadPool();
try {
for (Listener listener : listeners) {
executorService.submit(new Actor<Listener>(listener, consumer));
}
} finally {
executorService.shutdown();
try {
while(!executorService.awaitTermination(1, TimeUnit.SECONDS));
} catch (InterruptedException exception) {
Log.severe(exception);
}
this.readLock.unlock();
}
}
@SuppressWarnings("hiding")
class Actor<Listener> implements Callable<Object> {
private final Listener listener;
private final Consumer<? super Listener> consumer;
public Actor(Listener listener, Consumer<? super Listener> consumer) {
this.listener = listener;
this.consumer = consumer;
}
@Override
public Object call() throws Exception {
consumer.accept(listener);
return null;
}
}
}
No comments:
Post a Comment