source: java/main/src/main/java/com/framsticks/communication/Connection.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 9.5 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.params.Source;
4import com.framsticks.params.annotations.AutoAppendAnnotation;
5import com.framsticks.params.annotations.FramsClassAnnotation;
6import com.framsticks.params.annotations.ParamAnnotation;
7import com.framsticks.util.ExceptionHandler;
8import com.framsticks.util.FramsticksException;
9import com.framsticks.util.io.Encoding;
10import com.framsticks.util.lang.Strings;
11
12import org.apache.logging.log4j.Level;
13import org.apache.logging.log4j.Logger;
14import org.apache.logging.log4j.LogManager;
15import java.io.BufferedReader;
16import java.io.IOException;
17import java.io.InputStreamReader;
18import java.io.OutputStreamWriter;
19import java.io.PrintWriter;
20import java.net.Socket;
21import java.util.Collection;
22import java.util.HashSet;
23import java.util.Set;
24
25import com.framsticks.util.dispatching.AbstractJoinable;
26import com.framsticks.util.dispatching.Dispatcher;
27import com.framsticks.util.dispatching.Dispatching;
28import com.framsticks.util.dispatching.Joinable;
29import com.framsticks.util.dispatching.JoinableCollection;
30import com.framsticks.util.dispatching.JoinableParent;
31import com.framsticks.util.dispatching.JoinableState;
32import com.framsticks.util.dispatching.RunAt;
33import com.framsticks.util.dispatching.Thread;
34import com.framsticks.util.dispatching.ThrowExceptionHandler;
35
36@FramsClassAnnotation
37public abstract class Connection extends AbstractJoinable implements JoinableParent, ExceptionHandler {
38
39        protected final static Logger log = LogManager.getLogger(Connection.class);
40
41        private PrintWriter output = null;
42        private BufferedReader input = null;
43
44        protected Socket socket = null;
45
46        protected Address address;
47        protected String description = "connection";
48
49        protected final Thread<Connection> senderThread = new Thread<>();
50        protected final Thread<Connection> receiverThread = new Thread<>();
51        protected final JoinableCollection<Thread<Connection>> threads = new JoinableCollection<>();
52        protected final Set<ConnectionListener> listeners = new HashSet<>();
53
54        protected ExceptionHandler exceptionHandler = ThrowExceptionHandler.getInstance();
55
56        /**
57         *
58         */
59        public Connection() {
60                threads.add(senderThread);
61                threads.add(receiverThread);
62        }
63
64        protected void updateNames() {
65                if (address == null) {
66                        return;
67                }
68                senderThread.setName(description + " thread " + address + " sender");
69                receiverThread.setName(description + " thread " + address + " receiver");
70                threads.setObservableName(address + " connection threads");
71        }
72
73        public void setDescription(String description) {
74                this.description = description;
75                updateNames();
76        }
77
78        @AutoAppendAnnotation
79        public Connection setAddress(Address address) {
80                this.address = address;
81                updateNames();
82                return this;
83        }
84
85        @ParamAnnotation
86        public Connection setAddress(String address) {
87                return setAddress(new Address(address));
88        }
89
90        static final int BUFFER_LENGTH = 1024;
91
92        int readChars = 0;
93        int iterator = 0;
94        int bufferStart = 0;
95        final char[] readBuffer = new char[BUFFER_LENGTH];
96
97        protected String getLine() {
98                final StringBuilder lineBuffer = new StringBuilder();
99                try {
100                        while (!Thread.interrupted()) {
101                                while (iterator < readChars) {
102                                        if (readBuffer[iterator] != '\n') {
103                                                ++iterator;
104                                                continue;
105                                        }
106                                        /** Do not append new line. */
107                                        lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart);
108                                        ++iterator;
109                                        bufferStart = iterator;
110                                        String line = lineBuffer.toString();
111
112                                        synchronized (listeners) {
113                                                for (ConnectionListener l : listeners) {
114                                                        l.connectionIncomming(line);
115                                                }
116                                        }
117
118                                        return line;
119                                }
120                                final int length = readChars - bufferStart;
121                                if (length > 0) {
122                                        assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH;
123                                        assert bufferStart + length <= BUFFER_LENGTH;
124                                        lineBuffer.append(readBuffer, bufferStart, length);
125                                }
126
127                                readChars = 0;
128                                readChars = input.read(readBuffer);
129                                if (readChars < 0) {
130                                        throw new SocketClosedException().msg("socket is closed");
131                                }
132                                iterator = 0;
133                                bufferStart = 0;
134                        }
135                        throw new InterruptedException();
136                } catch (Exception e) {
137                        log.debug("failed to read line (closing): {}", e.getMessage());
138                        throw new SocketClosedException().msg("failed to read line").cause(e);
139                }
140        }
141
142        protected void putLine(String line) {
143                synchronized (listeners) {
144                        for (ConnectionListener l : listeners) {
145                                l.connectionOutgoing(line);
146                        }
147                }
148                output.println(line);
149        }
150
151        protected void flushOut() {
152                output.flush();
153        }
154
155        protected abstract void processNextInputBatch();
156
157
158        protected final void processInputBatchesUntilClosed() {
159                while (isRunning() && !socket.isClosed()) {
160                        try {
161                                processNextInputBatch();
162                        } catch (SocketClosedException e) {
163                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "socket is closing: {}", e.getShortMessage(new StringBuilder()));
164                                // log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
165                                break;
166                        } catch (FramsticksException e) {
167                                log.debug("{} caught exception in receiver thread {}", this, e.getMessage());
168                                handle(e);
169                        } catch (Exception e) {
170                                log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e);
171                                break;
172                        }
173                }
174                log.debug("{} finished processing input", this);
175        }
176
177        protected abstract void receiverThreadRoutine();
178
179        // @SuppressWarnings("NN_NAKED_NOTIFY")
180        protected void setupStreams() {
181                try {
182                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
183                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
184                        synchronized (this) {
185                                this.notifyAll();
186                        }
187                } catch (IOException e) {
188                        throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this);
189                }
190        }
191
192        @Override
193        protected void joinableFinish() {
194                try {
195                        if (output != null) {
196                                output.close();
197                                output = null;
198                        }
199
200                        if (input != null) {
201                                input.close();
202                                input = null;
203                        }
204
205                        if (socket != null) {
206                                socket.close();
207                                socket = null;
208                        }
209                } catch (Exception e) {
210                        log.error("failed to stop connection: ", e);
211                }
212                log.debug("connection closed");
213        }
214
215        @Override
216        public void childChangedState(Joinable joinable, JoinableState state) {
217                proceedToState(state);
218        }
219
220        @Override
221        public String getName() {
222                return address != null ? description + " " + address : description;
223        }
224
225        @Override
226        protected void joinableStart() {
227                Dispatching.use(threads, this);
228
229                senderThread.dispatch(new RunAt<Connection>(this) {
230                        @Override
231                        protected void runAt() {
232                                synchronized (Connection.this) {
233                                        while (state.equals(JoinableState.RUNNING) && output == null) {
234                                                Dispatching.wait(Connection.this, 500);
235                                        }
236                                }
237                        }
238                });
239
240                receiverThread.dispatch(new RunAt<Connection>(this) {
241                        @Override
242                        protected void runAt() {
243                                receiverThreadRoutine();
244                                interruptJoinable();
245                                // finishJoinable();
246                        }
247                });
248        }
249
250        @Override
251        protected void joinableInterrupt() {
252                Dispatching.drop(threads, this);
253                finishJoinable();
254        }
255
256        @Override
257        protected void joinableJoin() throws InterruptedException {
258                Dispatching.join(threads);
259        }
260
261        protected static void startClientConnection(Connection connection) {
262                while (connection.isRunning() && connection.socket == null) {
263                        log.debug("connecting to {}", connection.address);
264                        try {
265                                connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort());
266                        } catch (IOException e) {
267                                log.warn("{} failed to connect (retrying): {}", connection, e.getMessage());
268                                Dispatching.sleep(0.5);
269                        }
270                }
271
272                log.debug("{} connected", connection);
273                try {
274                        // connection.socket.setSoTimeout(500);
275                        connection.setupStreams();
276                } catch (Exception e) {
277                        throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", connection);
278                }
279        }
280
281        /**
282         * @return the address
283         */
284        @ParamAnnotation
285        public String getAddress() {
286                return Strings.toStringNullProof(address, "?");
287        }
288
289        public Address getAddressObject() {
290                return address;
291        }
292
293        /**
294         * @return the listeners
295         */
296        public Collection<ConnectionListener> getListeners() {
297                return listeners;
298        }
299
300        /**
301         * @return the handler
302         */
303        public ExceptionHandler getExceptionHandler() {
304                return exceptionHandler;
305        }
306
307        /**
308         * @param handler the handler to set
309         */
310        public void setExceptionHandler(ExceptionHandler handler) {
311                this.exceptionHandler = handler;
312        }
313
314        public static <T extends Connection> T to(T connection, Address address) {
315                connection.setAddress(address);
316                return connection;
317        }
318
319        @Override
320        public void handle(FramsticksException exception) {
321                log.debug("{} handling {}", this, exception.getMessage());
322                exceptionHandler.handle(exception);
323        }
324
325        public Dispatcher<Connection> getReceiverDispatcher() {
326                return receiverThread;
327        }
328
329        public Dispatcher<Connection> getSenderDispatcher() {
330                return senderThread;
331        }
332
333
334        protected static String idToString(Integer id) {
335                return id != null ? " " + id.toString() : "";
336        }
337
338        protected final void putFile(File file, Integer outId) {
339                putLine("file" + idToString(outId)/* + " " + f.getPath()*/);
340                Source content = file.getContent();
341                String line;
342                while ((line = content.readLine()) != null) {
343                        putLine(line);
344                }
345                putLine("eof");
346        }
347
348        public final void sendFile(final String header, final File file, final Integer id, ExceptionHandler handler) {
349                senderThread.dispatch(new RunAt<Connection>(handler) {
350                        @Override
351                        protected void runAt() {
352                                if (header != null) {
353                                        putLine(header);
354                                }
355                                putFile(file, id);
356                                flushOut();
357                        }
358                });
359        }
360
361        public synchronized boolean isConnected() {
362                return socket != null && socket.isConnected();
363        }
364
365}
Note: See TracBrowser for help on using the repository browser.