source: java/main/src/main/java/com/framsticks/util/dispatching/Thread.java @ 102

Last change on this file since 102 was 102, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

for Joinables running

CHANGELOG:
Add WorkPackageLogic? and classes representing prime experiment state.

Add classes for PrimeExperiment? state.

Extract single netload routine in Simulator.

Working netload with dummy content in PrimeExperiment?.

More development with NetLoadSaveLogic? and PrimeExperiment?.

Improvement around prime.

Improve BufferedDispatcher?.isActive logic.

Add prime-all.xml configuration.

Manual connecting to existing simulators from GUI.

Guard in SimulatorConnector? against expdef mismatch.

Guard against empty target dispatcher in BufferedDispatcher?.

Make BufferedDispatcher? a Dispatcher (and Joinable).

Minor improvements.

Done StackedJoinable?, improve Experiment.

Develop StackedJoinable?.

Add StackedJoinable? utility joinables controller.

Add dependency on apache-commons-lang.

Add ready ListChange? on Simulators.

Improve hints in ListChange?.

Several improvements.

Found bug with dispatching in Experiment.

Minor improvements.

Fix bug with early finishing Server.

Many changes in Dispatching.

Fix bug with connection.

Do not obfuscate log with socket related exceptions.

Add SocketClosedException?.

Add SimulatorConnector?.

Work out conception of experiment composing of logics building blocks.

Rename SinkInterface? to Sink.

Move saving of Accesses into AccessOperations?.

Some improvements to Experiment.

Improve joinables.

Fix issue with joinables closing.

Add direct and managed consoles to popup menu.

File size: 2.6 KB
Line 
1package com.framsticks.util.dispatching;
2
3import org.apache.logging.log4j.Logger;
4import org.apache.logging.log4j.LogManager;
5
6
7import com.framsticks.params.annotations.ParamAnnotation;
8import com.framsticks.util.dispatching.RunAt;
9
10/**
11 * @author Piotr Sniegowski
12 */
13public class Thread<C> extends AbstractJoinable implements Dispatcher<C> {
14
15        private static final Logger log = LogManager.getLogger(Thread.class);
16
17        protected final java.lang.Thread thread;
18
19        protected final Object condition = new Object();
20        private RunnableQueue<C> queue = new RunnableQueue<>();
21
22        public Thread() {
23                thread = new java.lang.Thread(new Runnable() {
24                        @Override
25                        public void run() {
26                                Thread.this.routine();
27                        }
28                });
29        }
30
31        public Thread(java.lang.Thread thread) {
32                this.thread = thread;
33        }
34
35        @Override
36        protected void joinableStart() {
37                thread.start();
38        }
39
40        @Override
41        public final boolean isActive() {
42                return thread.equals(java.lang.Thread.currentThread());
43        }
44
45        protected void routine() {
46                log.debug("starting thread {}", this);
47                assert getMonitor() != null;
48                ExceptionHandler exceptionHandler = getMonitor().getTaskExceptionHandler();
49                while (!java.lang.Thread.interrupted()) {
50                        RunAt<? extends C> runnable;
51                        synchronized (condition) {
52                                if (queue.isEmpty()) {
53                                        try {
54                                                condition.wait();
55                                        } catch (InterruptedException ignored) {
56                                                break;
57                                        }
58                                        continue;
59                                }
60                                runnable = queue.pollFirst();
61                        }
62                        if (runnable != null) {
63                                try {
64                                        runnable.run();
65                                } catch (Exception e) {
66                                        if (exceptionHandler != null) {
67                                                if (exceptionHandler.handle(this, e)) {
68                                                        continue;
69                                                }
70                                        }
71                                        log.error("error in thread: ", e);
72                                }
73                        }
74                }
75                log.debug("finishing thread {}", this);
76                finishJoinable();
77        }
78
79
80        @Override
81        public void dispatch(RunAt<? extends C> runnable) {
82                synchronized (condition) {
83                        queue.push(runnable);
84                        condition.notifyAll();
85                }
86        }
87
88        public RunnableQueue<C> switchQueue(RunnableQueue<C> queue) {
89                synchronized (condition) {
90                        RunnableQueue<C> result = this.queue;
91                        this.queue = queue;
92                        return result;
93                }
94        }
95
96        @Override
97        protected void joinableInterrupt() {
98                thread.interrupt();
99        }
100
101        @Override
102        protected void joinableJoin() throws InterruptedException {
103                thread.join(500);
104                log.debug("joined {}", this);
105        }
106
107        @ParamAnnotation
108        public Thread<C> setName(String name) {
109                thread.setName(name);
110                return this;
111        }
112
113        @ParamAnnotation
114        public String getName() {
115                return thread.getName();
116        }
117
118        public static boolean interrupted() {
119                return java.lang.Thread.interrupted();
120        }
121
122        // @Override
123        // public String toString() {
124        //      return getName();
125        // }
126
127        @Override
128        protected void joinableFinish() {
129        }
130
131}
Note: See TracBrowser for help on using the repository browser.