[101] | 1 | package com.framsticks.util.dispatching; |
---|
| 2 | |
---|
| 3 | |
---|
[102] | 4 | import org.apache.logging.log4j.Logger; |
---|
| 5 | import org.apache.logging.log4j.LogManager; |
---|
[101] | 6 | |
---|
[102] | 7 | import com.framsticks.util.FramsticksException; |
---|
| 8 | import com.framsticks.util.lang.Strings; |
---|
| 9 | // import static com.framsticks.util.dispatching.AbstractJoinable.wrap; |
---|
| 10 | |
---|
| 11 | public class BufferedDispatcher<C> extends AbstractJoinable implements Dispatcher<C>, JoinableParent { |
---|
| 12 | private static final Logger log = LogManager.getLogger(BufferedDispatcher.class); |
---|
| 13 | |
---|
[101] | 14 | protected final RunnableQueue<C> queue = new RunnableQueue<>(); |
---|
| 15 | |
---|
| 16 | protected Dispatcher<C> targetDispatcher; |
---|
| 17 | |
---|
[102] | 18 | protected boolean buffer = true; |
---|
| 19 | protected Dispatcher<C> parent; |
---|
[101] | 20 | |
---|
| 21 | /** |
---|
[102] | 22 | * @param parent |
---|
| 23 | */ |
---|
| 24 | public BufferedDispatcher(Dispatcher<C> parent) { |
---|
| 25 | this.parent = parent; |
---|
| 26 | } |
---|
| 27 | |
---|
| 28 | /** |
---|
[101] | 29 | * @return the targetDispatcher |
---|
| 30 | */ |
---|
| 31 | public synchronized Dispatcher<C> getTargetDispatcher() { |
---|
| 32 | return targetDispatcher; |
---|
| 33 | } |
---|
| 34 | |
---|
| 35 | /** |
---|
| 36 | * @param targetDispatcher the targetDispatcher to set |
---|
| 37 | */ |
---|
| 38 | public synchronized void setTargetDispatcher(Dispatcher<C> targetDispatcher) { |
---|
[102] | 39 | if (this.targetDispatcher != null) { |
---|
| 40 | throw new FramsticksException().msg("dispatcher is already set").arg("dispatcher", this.targetDispatcher).arg("in", this); |
---|
| 41 | } |
---|
| 42 | if (targetDispatcher == null) { |
---|
| 43 | throw new FramsticksException().msg("trying to set empty target dispatcher").arg("in", this); |
---|
| 44 | } |
---|
| 45 | log.debug("setting {} to {}", this, targetDispatcher); |
---|
[101] | 46 | this.targetDispatcher = targetDispatcher; |
---|
[102] | 47 | flushQueue(); |
---|
[101] | 48 | } |
---|
| 49 | |
---|
| 50 | public synchronized boolean isActive() { |
---|
[102] | 51 | if (targetDispatcher == null || buffer) { |
---|
[101] | 52 | return false; |
---|
| 53 | // throw new FramsticksException().msg("no dispatcher is set for tree yet").arg("tree", this); |
---|
| 54 | } |
---|
| 55 | return targetDispatcher.isActive(); |
---|
| 56 | } |
---|
| 57 | |
---|
| 58 | public synchronized void dispatch(final RunAt<? extends C> runnable) { |
---|
| 59 | if (targetDispatcher != null && !buffer) { |
---|
| 60 | targetDispatcher.dispatch(runnable); |
---|
| 61 | return; |
---|
| 62 | } |
---|
| 63 | queue.push(runnable); |
---|
| 64 | } |
---|
| 65 | |
---|
[102] | 66 | protected void flushQueue() { |
---|
| 67 | if (this.buffer || targetDispatcher == null) { |
---|
| 68 | return; |
---|
| 69 | } |
---|
| 70 | log.debug("flushing {} tasks in {}", queue.size(), this); |
---|
| 71 | while (!queue.isEmpty()) { |
---|
| 72 | targetDispatcher.dispatch(queue.pollFirst()); |
---|
| 73 | } |
---|
| 74 | } |
---|
| 75 | |
---|
[101] | 76 | public synchronized void setBuffer(final boolean buffer) { |
---|
| 77 | if (this.buffer == buffer) { |
---|
| 78 | return; |
---|
| 79 | } |
---|
| 80 | if (buffer) { |
---|
| 81 | this.buffer = true; |
---|
| 82 | return; |
---|
| 83 | } |
---|
| 84 | this.buffer = false; |
---|
[102] | 85 | flushQueue(); |
---|
| 86 | } |
---|
| 87 | |
---|
| 88 | @Override |
---|
| 89 | public String toString() { |
---|
| 90 | return parent + " -> " + Strings.toStringNullProof(targetDispatcher, "<null>"); |
---|
| 91 | } |
---|
| 92 | |
---|
| 93 | public void createThreadIfNeeded() { |
---|
| 94 | if (targetDispatcher != null) { |
---|
| 95 | return; |
---|
[101] | 96 | } |
---|
[102] | 97 | this.setTargetDispatcher(new Thread<C>().setName(parent.getName())); |
---|
| 98 | } |
---|
[101] | 99 | |
---|
[102] | 100 | @Override |
---|
| 101 | public String getName() { |
---|
| 102 | return parent + " buffered dispatcher"; |
---|
[101] | 103 | } |
---|
| 104 | |
---|
[102] | 105 | @Override |
---|
| 106 | protected void joinableStart() { |
---|
| 107 | Dispatching.use(targetDispatcher, this); |
---|
| 108 | } |
---|
[101] | 109 | |
---|
[102] | 110 | @Override |
---|
| 111 | protected void joinableInterrupt() { |
---|
| 112 | Dispatching.drop(targetDispatcher, this); |
---|
| 113 | |
---|
| 114 | finishJoinable(); |
---|
| 115 | } |
---|
| 116 | |
---|
| 117 | @Override |
---|
| 118 | protected void joinableFinish() { |
---|
| 119 | |
---|
| 120 | } |
---|
| 121 | |
---|
| 122 | @Override |
---|
| 123 | protected void joinableJoin() throws InterruptedException { |
---|
| 124 | Dispatching.join(targetDispatcher); |
---|
| 125 | } |
---|
| 126 | |
---|
| 127 | @Override |
---|
| 128 | public void childChangedState(Joinable joinable, JoinableState state) { |
---|
| 129 | proceedToState(state); |
---|
| 130 | } |
---|
[101] | 131 | } |
---|