source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 193

Last change on this file since 193 was 193, checked in by Maciej Komosinski, 10 years ago

Set svn:eol-style native for all textual files

  • Property svn:eol-style set to native
File size: 13.7 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.NeedFile;
6import com.framsticks.communication.queries.NeedFileAcceptor;
7import com.framsticks.communication.queries.ProtocolRequest;
8import com.framsticks.communication.queries.RegisterRequest;
9import com.framsticks.communication.queries.UseRequest;
10import com.framsticks.communication.queries.VersionRequest;
11import com.framsticks.params.ListSource;
12import com.framsticks.structure.Path;
13import com.framsticks.util.*;
14import com.framsticks.util.dispatching.AtOnceDispatcher;
15import com.framsticks.util.dispatching.Dispatcher;
16import com.framsticks.util.dispatching.Dispatching;
17import com.framsticks.util.dispatching.FutureHandler;
18import com.framsticks.util.dispatching.Future;
19import com.framsticks.util.dispatching.JoinableState;
20import com.framsticks.util.lang.Casting;
21import com.framsticks.util.lang.Pair;
22import com.framsticks.util.lang.Strings;
23import com.framsticks.params.EventListener;
24
25import org.apache.logging.log4j.Logger;
26import org.apache.logging.log4j.LogManager;
27
28import java.util.*;
29import java.util.regex.Matcher;
30
31import javax.annotation.Nonnull;
32import javax.annotation.Nullable;
33
34import com.framsticks.util.dispatching.RunAt;
35
36/**
37 * @author Piotr Sniegowski
38 */
39public class ClientSideManagedConnection extends ManagedConnection {
40
41        private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class);
42
43        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
44        private boolean isHandshakeDone = false;
45
46        protected NeedFileAcceptor needFileAcceptor;
47
48        /**
49         * @return the needFileAcceptor
50         */
51        public NeedFileAcceptor getNeedFileAcceptor() {
52                return needFileAcceptor;
53        }
54
55        /**
56         * @param needFileAcceptor the needFileAcceptor to set
57         */
58        public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) {
59                this.needFileAcceptor = needFileAcceptor;
60        }
61
62        /**
63         * @return the requestedVersion
64         */
65        public int getRequestedVersion() {
66                return requestedVersion;
67        }
68
69        /**
70         * @param requestedVersion the requestedVersion to set
71         */
72        public void setRequestedVersion(int requestedVersion) {
73                this.requestedVersion = requestedVersion;
74        }
75
76        protected int requestedVersion = 4;
77
78        public ClientSideManagedConnection() {
79                setDescription("client connection");
80                protocolVersion = -1;
81                requestedFeatures.add("request_id");
82                // requestedFeatures.add("call_empty_result");
83                requestedFeatures.add("needfile_id");
84        }
85
86        protected List<String> readFileContent() {
87                List<String> content = new LinkedList<String>();
88                String line;
89                boolean longValue = false;
90                while (true) {
91                        line = getLine();
92                        if (longValue) {
93                                if (line.endsWith("~") && !line.endsWith("\\~")) {
94                                        longValue = false;
95                                }
96                        } else {
97                                if (line.equals("eof")) {
98                                        break;
99                                }
100                                if (line.endsWith(":~")) {
101                                        longValue = true;
102                                }
103                        }
104                        content.add(line);
105                }
106                return content;
107        }
108
109        private static class SentQuery<C> {
110
111                Request request;
112                ClientSideResponseFuture callback;
113                Dispatcher<C> dispatcher;
114                protected final List<File> files = new ArrayList<File>();
115
116                public List<File> getFiles() {
117                        return files;
118                }
119
120                @Override
121                public String toString() {
122                        return request.toString();
123                }
124
125                public void dispatchResponseProcess(final Response response) {
126                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
127                                @Override
128                                protected void runAt() {
129                                        callback.pass(response);
130                                }
131                        });
132                }
133        }
134
135        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
136                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
137        }
138
139
140
141        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
142                synchronized (applicationRequestsBuffer) {
143                        if (!isHandshakeDone) {
144                                applicationRequestsBuffer.add(new Runnable() {
145                                        @Override
146                                        public void run() {
147                                                sendImplementation(request, dispatcher, callback);
148                                        }
149                                });
150                                return;
151                        }
152                }
153                sendImplementation(request, dispatcher, callback);
154        }
155
156        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
157                callback.setRequest(request);
158
159                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
160                        throw new FramsticksException().msg("connection is not connected").arg("connection", this);
161                }
162
163                final SentQuery<C> sentQuery = new SentQuery<C>();
164                sentQuery.request = request;
165                sentQuery.callback = callback;
166                sentQuery.dispatcher = dispatcher;
167
168
169                senderThread.dispatch(new RunAt<Connection>(callback) {
170                        @Override
171                        protected void runAt() {
172                                Integer id = sentQueries.put(null, sentQuery);
173
174                                String command = sentQuery.request.getCommand();
175                                StringBuilder message = new StringBuilder();
176                                message.append(command);
177                                if (id != null) {
178                                        message.append(" ").append(id);
179                                }
180                                message.append(" ");
181                                sentQuery.request.construct(message);
182                                String out = message.toString();
183
184                                putLine(out);
185                                flushOut();
186                                log.debug("sending query: {}", out);
187                        }
188                });
189        }
190
191        @Override
192        public String toString() {
193                return "client connection " + address;
194        }
195
196        private void sendNextUseRequest(final Iterator<String> featuresIterator, final FutureHandler<Void> future) {
197                if (!featuresIterator.hasNext()) {
198                        future.pass(null);
199                        return;
200                }
201                final String feature = featuresIterator.next();
202
203                send(new UseRequest().feature(feature), new ClientSideResponseFuture(future) {
204
205                        @Override
206                        protected void processOk(Response response) {
207                                if (feature.equals("request_id")) {
208                                        requestIdEnabled = true;
209                                }
210                                sendNextUseRequest(featuresIterator, future);
211                        }
212                });
213        }
214
215        private void sendQueryVersion(final int version, final FutureHandler<Void> future) {
216                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
217                        @Override
218                        protected void processOk(Response response) {
219                                protocolVersion = version;
220                                if (version < requestedVersion) {
221                                        /** it is an implicit loop here*/
222                                        sendQueryVersion(version + 1, future);
223                                        return;
224                                }
225                                sendNextUseRequest(requestedFeatures.iterator(), future);
226
227                        }
228                });
229        }
230
231        protected class IdCollection<T> {
232
233
234                protected final Map<Integer, T> map = new HashMap<>();
235                protected T current;
236
237                public Integer put(Integer idProposition, T value) {
238                        synchronized (ClientSideManagedConnection.this) {
239                                while (!(requestIdEnabled || current == null)) {
240                                        try {
241                                                ClientSideManagedConnection.this.wait();
242                                        } catch (InterruptedException ignored) {
243                                                break;
244                                        }
245                                }
246                                if (!requestIdEnabled) {
247                                        current = value;
248                                        return null;
249                                }
250                                if (idProposition == null) {
251                                        idProposition = nextQueryId++;
252                                }
253                                map.put(idProposition, value);
254                                return idProposition;
255                        }
256                }
257
258                public void clear(Integer id) {
259                        if (requestIdEnabled) {
260                                current = null;
261                        } else {
262                                map.remove(id);
263                        }
264                }
265
266                public @Nonnull T fetch(@Nullable Integer id, boolean remove) {
267                        synchronized (ClientSideManagedConnection.this) {
268                                try {
269                                        if (id == null) {
270                                                if (requestIdEnabled) {
271                                                        throw new FramsticksException().msg("request_id is enabled and id is missing");
272                                                }
273                                                T result = current;
274                                                current = null;
275                                                ClientSideManagedConnection.this.notifyAll();
276                                                return result;
277                                        }
278                                        if (!map.containsKey(id)) {
279                                                throw new FramsticksException().msg("id is unknown").arg("id", id);
280                                        }
281
282                                        T result = map.get(id);
283                                        if (remove) {
284                                                map.remove(id);
285                                        }
286                                        return result;
287
288                                } catch (FramsticksException e) {
289                                        throw new FramsticksException().msg("failed to match response to sent query").cause(e);
290                                }
291                        }
292                }
293        }
294
295        protected IdCollection<SentQuery<?>> sentQueries = new IdCollection<>();
296        protected IdCollection<NeedFile> needFiles = new IdCollection<>();
297
298        private int nextQueryId = 0;
299
300        protected void processEvent(String rest) {
301                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
302                if (!matcher.matches()) {
303                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
304                }
305                String fileLine = getLine();
306                if (!fileLine.equals("file")) {
307                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
308                }
309                String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString();
310                String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString();
311                final File file = new File("", new ListSource(readFileContent()));
312                log.debug("firing event {}", eventObjectPath);
313                EventListener<File> listener;
314                synchronized (registeredListeners) {
315                        listener = registeredListeners.get(eventObjectPath);
316                }
317                if (listener == null) {
318                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
319                }
320                listener.action(file);
321        }
322
323        protected void processNeedFile(Pair<Integer, CharSequence> rest) {
324                final Integer id = rest.first;
325                String suggestedName = null;
326                String description = null;
327                Pair<CharSequence, CharSequence> s = Request.takeString(rest.second);
328                if (s != null) {
329                        suggestedName = s.first.toString();
330                        Pair<CharSequence, CharSequence> d = Request.takeString(s.second);
331                        if (d != null) {
332                                description = d.first.toString();
333                        }
334                }
335
336                final FutureHandler<File> future = new FutureHandler<File>() {
337
338                        protected void send(final File result) {
339                                log.debug("sending file: " + result);
340                                needFiles.clear(id);
341                                sendFile(null, result, id, ClientSideManagedConnection.this);
342                        }
343
344                        @Override
345                        protected void result(File result) {
346                                send(result);
347                        }
348
349                        @Override
350                        public void handle(FramsticksException exception) {
351                                send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage())));
352                        }
353                };
354
355                NeedFile needFile = new NeedFile(suggestedName, description, future);
356
357                if (needFileAcceptor.acceptNeed(needFile)) {
358                        return;
359                }
360
361                future.handle(new FramsticksException().msg("acceptor did not accepted need"));
362        }
363
364        protected void processFile(Pair<Integer, CharSequence> rest) {
365                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, false);
366
367                String currentFilePath = rest.second.toString();
368                if (!Strings.notEmpty(currentFilePath)) {
369                        currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath();
370                }
371
372                sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent())));
373        }
374
375        protected void processMessageStartingWith(final String header) {
376                try {
377                        final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header);
378                        if (command == null) {
379                                throw new FramsticksException().msg("failed to parse command");
380                        }
381                        final CharSequence keyword = command.first;
382                        if (keyword.equals("event")) {
383                                processEvent(command.second.toString());
384                                return;
385                        }
386
387                        final Pair<Integer, CharSequence> rest = takeRequestId(command.second);
388                        if (rest == null) {
389                                throw new FramsticksException().msg("failed to parse optional id and remainder");
390                        }
391
392                        if (keyword.equals("file")) {
393                                processFile(rest);
394                                return;
395                        }
396                        if (keyword.equals("ok") || keyword.equals("error")) {
397
398                                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, true);
399
400                                log.debug("parsing response for request {}", sentQuery);
401
402                                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
403                                return;
404                        }
405                        if (keyword.equals("needfile")) {
406                                processNeedFile(rest);
407                                return;
408                        }
409
410                        throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword);
411                } catch (FramsticksException e) {
412                        throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e);
413                }
414        }
415
416        protected final ExceptionHandler closeOnFailure = new ExceptionHandler() {
417
418                @Override
419                public void handle(FramsticksException exception) {
420                        interruptJoinable();
421                        // finish();
422                }
423        };
424
425        @Override
426        protected void receiverThreadRoutine() {
427                startClientConnection(this);
428
429                sendQueryVersion(1, new Future<Void>(closeOnFailure) {
430
431                        @Override
432                        protected void result(Void result) {
433                                synchronized (applicationRequestsBuffer) {
434                                        isHandshakeDone = true;
435                                        for (Runnable r : applicationRequestsBuffer) {
436                                                r.run();
437                                        }
438                                        applicationRequestsBuffer.clear();
439                                }
440                        }
441                });
442
443                processInputBatchesUntilClosed();
444        }
445
446        protected void processNextInputBatch() {
447                processMessageStartingWith(getLine());
448        }
449
450        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
451
452        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final FutureHandler<Void> future) {
453                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
454                        @Override
455                        protected void processOk(Response response) {
456                                synchronized (registeredListeners) {
457                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
458                                }
459                                future.pass(null);
460                        }
461                });
462        }
463
464        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final FutureHandler<Void> future) {
465                String eventPath = null;
466                synchronized (registeredListeners) {
467                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
468                                if (e.getValue() == listener) {
469                                        eventPath = e.getKey();
470                                        break;
471                                }
472                        }
473                }
474                if (eventPath == null) {
475                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
476                        return;
477                }
478
479                final String finalEventPath = eventPath;
480                //TODO add arguments to the exception
481                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
482
483                        @Override
484                        protected void processOk(Response response) {
485                                synchronized (registeredListeners) {
486                                        registeredListeners.remove(finalEventPath);
487                                }
488                                future.pass(null);
489                        }
490                });
491        }
492}
Note: See TracBrowser for help on using the repository browser.