package com.framsticks.util.dispatching; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import com.framsticks.util.FramsticksException; import com.framsticks.util.lang.Strings; // import static com.framsticks.util.dispatching.AbstractJoinable.wrap; public class BufferedDispatcher extends AbstractJoinable implements Dispatcher, JoinableParent { private static final Logger log = LogManager.getLogger(BufferedDispatcher.class); protected final RunnableQueue queue = new RunnableQueue<>(); protected Dispatcher targetDispatcher; protected boolean buffer = true; protected Dispatcher parent; /** * @param parent */ public BufferedDispatcher(Dispatcher parent) { this.parent = parent; } /** * @return the targetDispatcher */ public synchronized Dispatcher getTargetDispatcher() { return targetDispatcher; } /** * @param targetDispatcher the targetDispatcher to set */ public synchronized void setTargetDispatcher(Dispatcher targetDispatcher) { if (this.targetDispatcher != null) { throw new FramsticksException().msg("dispatcher is already set").arg("dispatcher", this.targetDispatcher).arg("in", this); } if (targetDispatcher == null) { throw new FramsticksException().msg("trying to set empty target dispatcher").arg("in", this); } log.debug("setting {} to {}", this, targetDispatcher); this.targetDispatcher = targetDispatcher; flushQueue(); } public synchronized boolean isActive() { if (targetDispatcher == null || buffer) { return false; // throw new FramsticksException().msg("no dispatcher is set for tree yet").arg("tree", this); } return targetDispatcher.isActive(); } public synchronized void dispatch(final RunAt runnable) { if (targetDispatcher != null && !buffer) { targetDispatcher.dispatch(runnable); return; } queue.push(runnable); } protected void flushQueue() { if (this.buffer || targetDispatcher == null) { return; } log.debug("flushing {} tasks in {}", queue.size(), this); while (!queue.isEmpty()) { targetDispatcher.dispatch(queue.pollFirst()); } } public synchronized void setBuffer(final boolean buffer) { if (this.buffer == buffer) { return; } if (buffer) { this.buffer = true; return; } this.buffer = false; flushQueue(); } @Override public String toString() { return parent + " -> " + Strings.toStringNullProof(targetDispatcher, ""); } public void createThreadIfNeeded() { synchronized (this) { if (targetDispatcher != null) { return; } } this.setTargetDispatcher(new Thread().setName(parent.getName())); } @Override public String getName() { return parent + " buffered dispatcher"; } @Override protected synchronized void joinableStart() { Dispatching.use(targetDispatcher, this); } @Override protected synchronized void joinableInterrupt() { Dispatching.drop(targetDispatcher, this); finishJoinable(); } @Override protected void joinableFinish() { } @Override protected synchronized void joinableJoin() throws InterruptedException { Dispatching.join(targetDispatcher); } @Override public void childChangedState(Joinable joinable, JoinableState state) { proceedToState(state); } }