source: java/main/src/main/java/com/framsticks/util/dispatching/BufferedDispatcher.java @ 102

Last change on this file since 102 was 102, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

for Joinables running

CHANGELOG:
Add WorkPackageLogic? and classes representing prime experiment state.

Add classes for PrimeExperiment? state.

Extract single netload routine in Simulator.

Working netload with dummy content in PrimeExperiment?.

More development with NetLoadSaveLogic? and PrimeExperiment?.

Improvement around prime.

Improve BufferedDispatcher?.isActive logic.

Add prime-all.xml configuration.

Manual connecting to existing simulators from GUI.

Guard in SimulatorConnector? against expdef mismatch.

Guard against empty target dispatcher in BufferedDispatcher?.

Make BufferedDispatcher? a Dispatcher (and Joinable).

Minor improvements.

Done StackedJoinable?, improve Experiment.

Develop StackedJoinable?.

Add StackedJoinable? utility joinables controller.

Add dependency on apache-commons-lang.

Add ready ListChange? on Simulators.

Improve hints in ListChange?.

Several improvements.

Found bug with dispatching in Experiment.

Minor improvements.

Fix bug with early finishing Server.

Many changes in Dispatching.

Fix bug with connection.

Do not obfuscate log with socket related exceptions.

Add SocketClosedException?.

Add SimulatorConnector?.

Work out conception of experiment composing of logics building blocks.

Rename SinkInterface? to Sink.

Move saving of Accesses into AccessOperations?.

Some improvements to Experiment.

Improve joinables.

Fix issue with joinables closing.

Add direct and managed consoles to popup menu.

File size: 3.1 KB
Line 
1package com.framsticks.util.dispatching;
2
3
4import org.apache.logging.log4j.Logger;
5import org.apache.logging.log4j.LogManager;
6
7import com.framsticks.util.FramsticksException;
8import com.framsticks.util.lang.Strings;
9// import static com.framsticks.util.dispatching.AbstractJoinable.wrap;
10
11public class BufferedDispatcher<C> extends AbstractJoinable implements Dispatcher<C>, JoinableParent {
12        private static final Logger log = LogManager.getLogger(BufferedDispatcher.class);
13
14        protected final RunnableQueue<C> queue = new RunnableQueue<>();
15
16        protected Dispatcher<C> targetDispatcher;
17
18        protected boolean buffer = true;
19        protected Dispatcher<C> parent;
20
21        /**
22         * @param parent
23         */
24        public BufferedDispatcher(Dispatcher<C> parent) {
25                this.parent = parent;
26        }
27
28        /**
29         * @return the targetDispatcher
30         */
31        public synchronized Dispatcher<C> getTargetDispatcher() {
32                return targetDispatcher;
33        }
34
35        /**
36         * @param targetDispatcher the targetDispatcher to set
37         */
38        public synchronized void setTargetDispatcher(Dispatcher<C> targetDispatcher) {
39                if (this.targetDispatcher != null) {
40                        throw new FramsticksException().msg("dispatcher is already set").arg("dispatcher", this.targetDispatcher).arg("in", this);
41                }
42                if (targetDispatcher == null) {
43                        throw new FramsticksException().msg("trying to set empty target dispatcher").arg("in", this);
44                }
45                log.debug("setting {} to {}", this, targetDispatcher);
46                this.targetDispatcher = targetDispatcher;
47                flushQueue();
48        }
49
50        public synchronized boolean isActive() {
51                if (targetDispatcher == null || buffer) {
52                        return false;
53                        // throw new FramsticksException().msg("no dispatcher is set for tree yet").arg("tree", this);
54                }
55                return targetDispatcher.isActive();
56        }
57
58        public synchronized void dispatch(final RunAt<? extends C> runnable) {
59                if (targetDispatcher != null && !buffer) {
60                        targetDispatcher.dispatch(runnable);
61                        return;
62                }
63                queue.push(runnable);
64        }
65
66        protected void flushQueue() {
67                if (this.buffer || targetDispatcher == null) {
68                        return;
69                }
70                log.debug("flushing {} tasks in {}", queue.size(), this);
71                while (!queue.isEmpty()) {
72                        targetDispatcher.dispatch(queue.pollFirst());
73                }
74        }
75
76        public synchronized void setBuffer(final boolean buffer) {
77                if (this.buffer == buffer) {
78                        return;
79                }
80                if (buffer) {
81                        this.buffer = true;
82                        return;
83                }
84                this.buffer = false;
85                flushQueue();
86        }
87
88        @Override
89        public String toString() {
90                return parent + " -> " + Strings.toStringNullProof(targetDispatcher, "<null>");
91        }
92
93        public void createThreadIfNeeded() {
94                if (targetDispatcher != null) {
95                        return;
96                }
97                this.setTargetDispatcher(new Thread<C>().setName(parent.getName()));
98        }
99
100        @Override
101        public String getName() {
102                return parent + " buffered dispatcher";
103        }
104
105        @Override
106        protected void joinableStart() {
107                Dispatching.use(targetDispatcher, this);
108        }
109
110        @Override
111        protected void joinableInterrupt() {
112                Dispatching.drop(targetDispatcher, this);
113
114                finishJoinable();
115        }
116
117        @Override
118        protected void joinableFinish() {
119
120        }
121
122        @Override
123        protected void joinableJoin() throws InterruptedException {
124                Dispatching.join(targetDispatcher);
125        }
126
127        @Override
128        public void childChangedState(Joinable joinable, JoinableState state) {
129                proceedToState(state);
130        }
131}
Note: See TracBrowser for help on using the repository browser.