source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 102

Last change on this file since 102 was 102, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

for Joinables running

CHANGELOG:
Add WorkPackageLogic? and classes representing prime experiment state.

Add classes for PrimeExperiment? state.

Extract single netload routine in Simulator.

Working netload with dummy content in PrimeExperiment?.

More development with NetLoadSaveLogic? and PrimeExperiment?.

Improvement around prime.

Improve BufferedDispatcher?.isActive logic.

Add prime-all.xml configuration.

Manual connecting to existing simulators from GUI.

Guard in SimulatorConnector? against expdef mismatch.

Guard against empty target dispatcher in BufferedDispatcher?.

Make BufferedDispatcher? a Dispatcher (and Joinable).

Minor improvements.

Done StackedJoinable?, improve Experiment.

Develop StackedJoinable?.

Add StackedJoinable? utility joinables controller.

Add dependency on apache-commons-lang.

Add ready ListChange? on Simulators.

Improve hints in ListChange?.

Several improvements.

Found bug with dispatching in Experiment.

Minor improvements.

Fix bug with early finishing Server.

Many changes in Dispatching.

Fix bug with connection.

Do not obfuscate log with socket related exceptions.

Add SocketClosedException?.

Add SimulatorConnector?.

Work out conception of experiment composing of logics building blocks.

Rename SinkInterface? to Sink.

Move saving of Accesses into AccessOperations?.

Some improvements to Experiment.

Improve joinables.

Fix issue with joinables closing.

Add direct and managed consoles to popup menu.

File size: 9.6 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.params.Source;
4import com.framsticks.params.annotations.AutoAppendAnnotation;
5import com.framsticks.params.annotations.FramsClassAnnotation;
6import com.framsticks.params.annotations.ParamAnnotation;
7import com.framsticks.util.FramsticksException;
8import com.framsticks.util.io.Encoding;
9import com.framsticks.util.lang.Strings;
10
11import org.apache.logging.log4j.Level;
12import org.apache.logging.log4j.Logger;
13import org.apache.logging.log4j.LogManager;
14import java.io.BufferedReader;
15import java.io.IOException;
16import java.io.InputStreamReader;
17import java.io.OutputStreamWriter;
18import java.io.PrintWriter;
19import java.net.Socket;
20import java.util.Collection;
21import java.util.HashSet;
22import java.util.Set;
23
24import com.framsticks.util.dispatching.AbstractJoinable;
25import com.framsticks.util.dispatching.Dispatcher;
26import com.framsticks.util.dispatching.Dispatching;
27import com.framsticks.util.dispatching.ExceptionResultHandler;
28import com.framsticks.util.dispatching.Joinable;
29import com.framsticks.util.dispatching.JoinableCollection;
30import com.framsticks.util.dispatching.JoinableParent;
31import com.framsticks.util.dispatching.JoinableState;
32import com.framsticks.util.dispatching.RunAt;
33import com.framsticks.util.dispatching.Thread;
34import com.framsticks.util.dispatching.ThrowExceptionHandler;
35
36@FramsClassAnnotation
37public abstract class Connection extends AbstractJoinable implements JoinableParent, ExceptionResultHandler {
38
39        protected final static Logger log = LogManager.getLogger(Connection.class);
40
41        private PrintWriter output = null;
42        private BufferedReader input = null;
43
44        protected Socket socket = null;
45
46        protected Address address;
47        protected String description = "connection";
48
49        protected final Thread<Connection> senderThread = new Thread<>();
50        protected final Thread<Connection> receiverThread = new Thread<>();
51        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>();
52        protected final Set<ConnectionListener> listeners = new HashSet<>();
53
54        protected ExceptionResultHandler exceptionHandler = ThrowExceptionHandler.getInstance();
55
56        /**
57         *
58         */
59        public Connection() {
60                threads.add(senderThread);
61                threads.add(receiverThread);
62        }
63
64        protected void updateNames() {
65                if (address == null) {
66                        return;
67                }
68                senderThread.setName(description + " thread " + address + " sender");
69                receiverThread.setName(description + " thread " + address + " receiver");
70                threads.setObservableName(address + " connection threads");
71        }
72
73        public void setDescription(String description) {
74                this.description = description;
75                updateNames();
76        }
77
78        @AutoAppendAnnotation
79        public Connection setAddress(Address address) {
80                this.address = address;
81                updateNames();
82                return this;
83        }
84
85        @ParamAnnotation
86        public Connection setAddress(String address) {
87                return setAddress(new Address(address));
88        }
89
90        static final int BUFFER_LENGTH = 1024;
91
92        int readChars = 0;
93        int iterator = 0;
94        int bufferStart = 0;
95        final char[] readBuffer = new char[BUFFER_LENGTH];
96
97        protected String getLine() {
98                final StringBuilder lineBuffer = new StringBuilder();
99                try {
100                        while (!Thread.interrupted()) {
101                                while (iterator < readChars) {
102                                        if (readBuffer[iterator] != '\n') {
103                                                ++iterator;
104                                                continue;
105                                        }
106                                        /** Do not append new line. */
107                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart);
108                                        ++iterator;
109                                        bufferStart = iterator;
110                                        String line = lineBuffer.toString();
111
112                                        synchronized (listeners) {
113                                                for (ConnectionListener l : listeners) {
114                                                        l.connectionIncomming(line);
115                                                }
116                                        }
117
118                                        return line;
119                                }
120                                final int length = readChars - bufferStart;
121                                if (length > 0) {
122                                        assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH;
123                                        assert bufferStart + length <= BUFFER_LENGTH;
124                                        lineBuffer.append(readBuffer, bufferStart, length);
125                                }
126
127                                readChars = 0;
128                                readChars = input.read(readBuffer);
129                                if (readChars < 0) {
130                                        throw new SocketClosedException().msg("socket is closed");
131                                }
132                                iterator = 0;
133                                bufferStart = 0;
134                        }
135                        throw new InterruptedException();
136                } catch (Exception e) {
137                        log.debug("failed to read line (closing): {}", e.getMessage());
138                        throw new SocketClosedException().msg("failed to read line").cause(e);
139                }
140        }
141
142        protected void putLine(String line) {
143                synchronized (listeners) {
144                        for (ConnectionListener l : listeners) {
145                                l.connectionOutgoing(line);
146                        }
147                }
148                output.println(line);
149        }
150
151        protected void flushOut() {
152                output.flush();
153        }
154
155        protected abstract void processNextInputBatch();
156
157
158        protected final void processInputBatchesUntilClosed() {
159                while (isRunning() && !socket.isClosed()) {
160                        try {
161                                processNextInputBatch();
162                        } catch (SocketClosedException e) {
163                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "socket is closing: {}", e.getShortMessage(new StringBuilder()));
164                                // log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
165                                break;
166                        } catch (FramsticksException e) {
167                                log.debug("{} caught exception in receiver thread {}", this, e.getMessage());
168                                handle(e);
169                        } catch (Exception e) {
170                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
171                                break;
172                        }
173                }
174                log.debug("{} finished processing input", this);
175        }
176
177        protected abstract void receiverThreadRoutine();
178
179        // @SuppressWarnings("NN_NAKED_NOTIFY")
180        protected void setupStreams() {
181                try {
182                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
183                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
184                        synchronized (this) {
185                                this.notifyAll();
186                        }
187                } catch (IOException e) {
188                        throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this);
189                }
190        }
191
192        @Override
193        protected void joinableFinish() {
194                try {
195                        if (output != null) {
196                                output.close();
197                                output = null;
198                        }
199
200                        if (input != null) {
201                                input.close();
202                                input = null;
203                        }
204
205                        if (socket != null) {
206                                socket.close();
207                                socket = null;
208                        }
209                } catch (Exception e) {
210                        log.error("failed to stop connection: ", e);
211                }
212                log.debug("connection closed");
213        }
214
215        @Override
216        public void childChangedState(Joinable joinable, JoinableState state) {
217                proceedToState(state);
218        }
219
220        @Override
221        public String getName() {
222                return address != null ? description + " " + address : description;
223        }
224
225        @Override
226        protected void joinableStart() {
227                Dispatching.use(threads, this);
228
229                senderThread.dispatch(new RunAt<Connection>(this) {
230                        @Override
231                        protected void runAt() {
232                                synchronized (Connection.this) {
233                                        while (state.equals(JoinableState.RUNNING) && output == null) {
234                                                Dispatching.wait(Connection.this, 500);
235                                        }
236                                }
237                        }
238                });
239
240                receiverThread.dispatch(new RunAt<Connection>(this) {
241                        @Override
242                        protected void runAt() {
243                                receiverThreadRoutine();
244                                interruptJoinable();
245                                // finishJoinable();
246                        }
247                });
248        }
249
250        @Override
251        protected void joinableInterrupt() {
252                Dispatching.drop(threads, this);
253                finishJoinable();
254        }
255
256        @Override
257        protected void joinableJoin() throws InterruptedException {
258                Dispatching.join(threads);
259        }
260
261        protected static void startClientConnection(Connection connection) {
262                while (connection.isRunning() && connection.socket == null) {
263                        log.debug("connecting to {}", connection.address);
264                        try {
265                                connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort());
266                        } catch (IOException e) {
267                                log.warn("{} failed to connect (retrying): {}", connection, e.getMessage());
268                                Dispatching.sleep(0.5);
269                        }
270                }
271
272                log.debug("{} connected", connection);
273                try {
274                        // connection.socket.setSoTimeout(500);
275                        connection.setupStreams();
276                } catch (Exception e) {
277                        throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", connection);
278                }
279        }
280
281        /**
282         * @return the address
283         */
284        @ParamAnnotation
285        public String getAddress() {
286                return Strings.toStringNullProof(address, "?");
287        }
288
289        public Address getAddressObject() {
290                return address;
291        }
292
293        /**
294         * @return the listeners
295         */
296        public Collection<ConnectionListener> getListeners() {
297                return listeners;
298        }
299
300        /**
301         * @return the handler
302         */
303        public ExceptionResultHandler getExceptionHandler() {
304                return exceptionHandler;
305        }
306
307        /**
308         * @param handler the handler to set
309         */
310        public void setExceptionHandler(ExceptionResultHandler handler) {
311                this.exceptionHandler = handler;
312        }
313
314        public static <T extends Connection> T to(T connection, Address address) {
315                connection.setAddress(address);
316                return connection;
317        }
318
319        @Override
320        public void handle(FramsticksException exception) {
321                log.debug("{} handling {}", this, exception.getMessage());
322                exceptionHandler.handle(exception);
323        }
324
325        public Dispatcher<Connection> getReceiverDispatcher() {
326                return receiverThread;
327        }
328
329        public Dispatcher<Connection> getSenderDispatcher() {
330                return senderThread;
331        }
332
333
334        protected static String idToString(Integer id) {
335                return id != null ? " " + id.toString() : "";
336        }
337
338        protected final void putFile(File file, Integer outId) {
339                putLine("file" + idToString(outId)/* + " " + f.getPath()*/);
340                Source content = file.getContent();
341                String line;
342                while ((line = content.readLine()) != null) {
343                        putLine(line);
344                }
345                putLine("eof");
346        }
347
348        public final void sendFile(final String header, final File file, final Integer id, ExceptionResultHandler handler) {
349                senderThread.dispatch(new RunAt<Connection>(handler) {
350                        @Override
351                        protected void runAt() {
352                                if (header != null) {
353                                        putLine(header);
354                                }
355                                putFile(file, id);
356                                flushOut();
357                        }
358                });
359        }
360
361        public synchronized boolean isConnected() {
362                return socket != null && socket.isConnected();
363        }
364
365}
Note: See TracBrowser for help on using the repository browser.