source: java/main/src/main/java/com/framsticks/communication/ClientSideManagedConnection.java @ 103

Last change on this file since 103 was 103, checked in by psniegowski, 11 years ago

HIGHLIGHTS:

  • add auto loading and saving algorithms between

frams files format and Java classes

  • respect ValueChange? events in GUI (do not reload object)
  • support results of procedures in Java server
  • make Experiment automatically convert between frams file and NetFile? object
  • add MessageLogger? (compatible with original frams server messages)
  • WorkPackageLogic? now validates results, is able to discard them, reschedule

whole package, or only uncomputed remainder

CHANGELOG:
Show just a short description in PrimeExperiment?.

Add primes_changed event to the PrimeExperiment?.

Make WorkPackageLogic? robust to frams server returning invalid results.

Add MessageLogger? to logics.

Add NetFile? interface. Support Messages from server.

Minor changes to connections.

Merge results in the PrimeExperiment?.

More netload class->file conversion to Simulator.

Move netsave parsing to Simulator.

Fix bug with inverted ordering of events firing in Experiment.

Minor changes.

Minor logging changes.

Use AccessOperations?.convert in NetLoadSaveLogic?

NetLoadSaveLogic? now encloses the conversion.

Use more generic AccessOperations? saveAll and loadAll in PrimePackage?.

Add Result class for enclosing of call invocations' results.

Improve feature request handling in Connections.

Use AccessOperations?.convert in RemoteTree? events parsing.

Minor change.

Add some information params to Java server root and CLI objects.

A draft implementation of loadAll algorithm.

That algorithm tries to load objects into a tree structure.

Add AccessOperationsTest? test.

Develop WorkPackageLogic?.

  • add state tracking fields
  • add work package generation

Add utility class SimplePrimitive?.

Meant for Java backend classes, enclose a single primitive value
and set of listeners.

Improve primitive value refresh in GUI.

When ValueChange? found in called event, do not reload whole
object, but only update GUI (no communication is performed).

Use ValueChange? in the TestClass? test.

Minor changes.

Sending all packages in PrimeExperiment? to the frams servers.

Develop AccessOperations?.loadComposites().

Remove addAccess from MultiParamLoader? interface.

There is now no default AccessProvider? in MultiParamLoader?.
User must explicitely set AccessStash? or Registry.

Improve saving algorithms in AccessOperations?.

File size: 13.7 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.CallRequest;
5import com.framsticks.communication.queries.NeedFile;
6import com.framsticks.communication.queries.NeedFileAcceptor;
7import com.framsticks.communication.queries.ProtocolRequest;
8import com.framsticks.communication.queries.RegisterRequest;
9import com.framsticks.communication.queries.UseRequest;
10import com.framsticks.communication.queries.VersionRequest;
11import com.framsticks.core.Path;
12import com.framsticks.params.ListSource;
13import com.framsticks.util.*;
14import com.framsticks.util.dispatching.AtOnceDispatcher;
15import com.framsticks.util.dispatching.Dispatcher;
16import com.framsticks.util.dispatching.Dispatching;
17import com.framsticks.util.dispatching.ExceptionResultHandler;
18import com.framsticks.util.dispatching.Future;
19import com.framsticks.util.dispatching.FutureHandler;
20import com.framsticks.util.dispatching.JoinableState;
21import com.framsticks.util.lang.Casting;
22import com.framsticks.util.lang.Pair;
23import com.framsticks.util.lang.Strings;
24import com.framsticks.params.EventListener;
25
26import org.apache.logging.log4j.Logger;
27import org.apache.logging.log4j.LogManager;
28
29import java.util.*;
30import java.util.regex.Matcher;
31
32import javax.annotation.Nonnull;
33import javax.annotation.Nullable;
34
35import com.framsticks.util.dispatching.RunAt;
36
37/**
38 * @author Piotr Sniegowski
39 */
40public class ClientSideManagedConnection extends ManagedConnection {
41
42        private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class);
43
44        private final List<Runnable> applicationRequestsBuffer = new LinkedList<>();
45        private boolean isHandshakeDone = false;
46
47        protected NeedFileAcceptor needFileAcceptor;
48
49        /**
50         * @return the needFileAcceptor
51         */
52        public NeedFileAcceptor getNeedFileAcceptor() {
53                return needFileAcceptor;
54        }
55
56        /**
57         * @param needFileAcceptor the needFileAcceptor to set
58         */
59        public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) {
60                this.needFileAcceptor = needFileAcceptor;
61        }
62
63        /**
64         * @return the requestedVersion
65         */
66        public int getRequestedVersion() {
67                return requestedVersion;
68        }
69
70        /**
71         * @param requestedVersion the requestedVersion to set
72         */
73        public void setRequestedVersion(int requestedVersion) {
74                this.requestedVersion = requestedVersion;
75        }
76
77        protected int requestedVersion = 4;
78
79        public ClientSideManagedConnection() {
80                setDescription("client connection");
81                protocolVersion = -1;
82                requestedFeatures.add("request_id");
83                requestedFeatures.add("call_empty_result");
84                requestedFeatures.add("needfile_id");
85        }
86
87        protected List<String> readFileContent() {
88                List<String> content = new LinkedList<String>();
89                String line;
90                boolean longValue = false;
91                while (true) {
92                        line = getLine();
93                        if (longValue) {
94                                if (line.endsWith("~") && !line.endsWith("\\~")) {
95                                        longValue = false;
96                                }
97                        } else {
98                                if (line.equals("eof")) {
99                                        break;
100                                }
101                                if (line.endsWith(":~")) {
102                                        longValue = true;
103                                }
104                        }
105                        content.add(line);
106                }
107                return content;
108        }
109
110        private static class SentQuery<C> {
111
112                Request request;
113                ClientSideResponseFuture callback;
114                Dispatcher<C> dispatcher;
115                protected final List<File> files = new ArrayList<File>();
116
117                public List<File> getFiles() {
118                        return files;
119                }
120
121                @Override
122                public String toString() {
123                        return request.toString();
124                }
125
126                public void dispatchResponseProcess(final Response response) {
127                        Dispatching.dispatchIfNotActive(dispatcher, new RunAt<C>(callback) {
128                                @Override
129                                protected void runAt() {
130                                        callback.pass(response);
131                                }
132                        });
133                }
134        }
135
136        public void send(ProtocolRequest request, ClientSideResponseFuture callback) {
137                sendImplementation(request, AtOnceDispatcher.getInstance(), callback);
138        }
139
140
141
142        public <C> void send(final ApplicationRequest request, final Dispatcher<C> dispatcher, final ClientSideResponseFuture callback) {
143                synchronized (applicationRequestsBuffer) {
144                        if (!isHandshakeDone) {
145                                applicationRequestsBuffer.add(new Runnable() {
146                                        @Override
147                                        public void run() {
148                                                sendImplementation(request, dispatcher, callback);
149                                        }
150                                });
151                                return;
152                        }
153                }
154                sendImplementation(request, dispatcher, callback);
155        }
156
157        private <C> void sendImplementation(Request request, Dispatcher<C> dispatcher, ClientSideResponseFuture callback) {
158                callback.setRequest(request);
159
160                if (getState().ordinal() > JoinableState.RUNNING.ordinal()) {
161                        throw new FramsticksException().msg("connection is not connected").arg("connection", this);
162                }
163
164                final SentQuery<C> sentQuery = new SentQuery<C>();
165                sentQuery.request = request;
166                sentQuery.callback = callback;
167                sentQuery.dispatcher = dispatcher;
168
169
170                senderThread.dispatch(new RunAt<Connection>(callback) {
171                        @Override
172                        protected void runAt() {
173                                Integer id = sentQueries.put(null, sentQuery);
174
175                                String command = sentQuery.request.getCommand();
176                                StringBuilder message = new StringBuilder();
177                                message.append(command);
178                                if (id != null) {
179                                        message.append(" ").append(id);
180                                }
181                                message.append(" ");
182                                sentQuery.request.construct(message);
183                                String out = message.toString();
184
185                                putLine(out);
186                                flushOut();
187                                log.debug("sending query: {}", out);
188                        }
189                });
190        }
191
192        @Override
193        public String toString() {
194                return "client connection " + address;
195        }
196
197        private void sendNextUseRequest(final Iterator<String> featuresIterator, final Future<Void> future) {
198                if (!featuresIterator.hasNext()) {
199                        future.pass(null);
200                        return;
201                }
202                final String feature = featuresIterator.next();
203
204                send(new UseRequest().feature(feature), new ClientSideResponseFuture(future) {
205
206                        @Override
207                        protected void processOk(Response response) {
208                                if (feature.equals("request_id")) {
209                                        requestIdEnabled = true;
210                                }
211                                sendNextUseRequest(featuresIterator, future);
212                        }
213                });
214        }
215
216        private void sendQueryVersion(final int version, final Future<Void> future) {
217                send(new VersionRequest().version(version), new ClientSideResponseFuture(future) {
218                        @Override
219                        protected void processOk(Response response) {
220                                protocolVersion = version;
221                                if (version < requestedVersion) {
222                                        /** it is an implicit loop here*/
223                                        sendQueryVersion(version + 1, future);
224                                        return;
225                                }
226                                sendNextUseRequest(requestedFeatures.iterator(), future);
227
228                        }
229                });
230        }
231
232        protected class IdCollection<T> {
233
234
235                protected final Map<Integer, T> map = new HashMap<>();
236                protected T current;
237
238                public Integer put(Integer idProposition, T value) {
239                        synchronized (ClientSideManagedConnection.this) {
240                                while (!(requestIdEnabled || current == null)) {
241                                        try {
242                                                ClientSideManagedConnection.this.wait();
243                                        } catch (InterruptedException ignored) {
244                                                break;
245                                        }
246                                }
247                                if (!requestIdEnabled) {
248                                        current = value;
249                                        return null;
250                                }
251                                if (idProposition == null) {
252                                        idProposition = nextQueryId++;
253                                }
254                                map.put(idProposition, value);
255                                return idProposition;
256                        }
257                }
258
259                public void clear(Integer id) {
260                        if (requestIdEnabled) {
261                                current = null;
262                        } else {
263                                map.remove(id);
264                        }
265                }
266
267                public @Nonnull T fetch(@Nullable Integer id, boolean remove) {
268                        synchronized (ClientSideManagedConnection.this) {
269                                try {
270                                        if (id == null) {
271                                                if (requestIdEnabled) {
272                                                        throw new FramsticksException().msg("request_id is enabled and id is missing");
273                                                }
274                                                T result = current;
275                                                current = null;
276                                                ClientSideManagedConnection.this.notifyAll();
277                                                return result;
278                                        }
279                                        if (!map.containsKey(id)) {
280                                                throw new FramsticksException().msg("id is unknown").arg("id", id);
281                                        }
282
283                                        T result = map.get(id);
284                                        if (remove) {
285                                                map.remove(id);
286                                        }
287                                        return result;
288
289                                } catch (FramsticksException e) {
290                                        throw new FramsticksException().msg("failed to match response to sent query").cause(e);
291                                }
292                        }
293                }
294        }
295
296        protected IdCollection<SentQuery<?>> sentQueries = new IdCollection<>();
297        protected IdCollection<NeedFile> needFiles = new IdCollection<>();
298
299        private int nextQueryId = 0;
300
301        protected void processEvent(String rest) {
302                Matcher matcher = Request.EVENT_PATTERN.matcher(rest);
303                if (!matcher.matches()) {
304                        throw new FramsticksException().msg("invalid event line").arg("rest", rest);
305                }
306                String fileLine = getLine();
307                if (!fileLine.equals("file")) {
308                        throw new FramsticksException().msg("expected file line").arg("got", fileLine);
309                }
310                String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString();
311                String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString();
312                final File file = new File("", new ListSource(readFileContent()));
313                log.debug("firing event {}", eventObjectPath);
314                EventListener<File> listener;
315                synchronized (registeredListeners) {
316                        listener = registeredListeners.get(eventObjectPath);
317                }
318                if (listener == null) {
319                        throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath);
320                }
321                listener.action(file);
322        }
323
324        protected void processNeedFile(Pair<Integer, CharSequence> rest) {
325                final Integer id = rest.first;
326                String suggestedName = null;
327                String description = null;
328                Pair<CharSequence, CharSequence> s = Request.takeString(rest.second);
329                if (s != null) {
330                        suggestedName = s.first.toString();
331                        Pair<CharSequence, CharSequence> d = Request.takeString(s.second);
332                        if (d != null) {
333                                description = d.first.toString();
334                        }
335                }
336
337                final Future<File> future = new Future<File>() {
338
339                        protected void send(final File result) {
340                                log.debug("sending file: " + result);
341                                needFiles.clear(id);
342                                sendFile(null, result, id, ClientSideManagedConnection.this);
343                        }
344
345                        @Override
346                        protected void result(File result) {
347                                send(result);
348                        }
349
350                        @Override
351                        public void handle(FramsticksException exception) {
352                                send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage())));
353                        }
354                };
355
356                NeedFile needFile = new NeedFile(suggestedName, description, future);
357
358                if (needFileAcceptor.acceptNeed(needFile)) {
359                        return;
360                }
361
362                future.handle(new FramsticksException().msg("acceptor did not accepted need"));
363        }
364
365        protected void processFile(Pair<Integer, CharSequence> rest) {
366                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, false);
367
368                String currentFilePath = rest.second.toString();
369                if (!Strings.notEmpty(currentFilePath)) {
370                        currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath();
371                }
372
373                sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent())));
374        }
375
376        protected void processMessageStartingWith(final String header) {
377                try {
378                        final Pair<CharSequence, CharSequence> command = Request.takeIdentifier(header);
379                        if (command == null) {
380                                throw new FramsticksException().msg("failed to parse command");
381                        }
382                        final CharSequence keyword = command.first;
383                        if (keyword.equals("event")) {
384                                processEvent(command.second.toString());
385                                return;
386                        }
387
388                        final Pair<Integer, CharSequence> rest = takeRequestId(command.second);
389                        if (rest == null) {
390                                throw new FramsticksException().msg("failed to parse optional id and remainder");
391                        }
392
393                        if (keyword.equals("file")) {
394                                processFile(rest);
395                                return;
396                        }
397                        if (keyword.equals("ok") || keyword.equals("error")) {
398
399                                final SentQuery<?> sentQuery = sentQueries.fetch(rest.first, true);
400
401                                log.debug("parsing response for request {}", sentQuery);
402
403                                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles()));
404                                return;
405                        }
406                        if (keyword.equals("needfile")) {
407                                processNeedFile(rest);
408                                return;
409                        }
410
411                        throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword);
412                } catch (FramsticksException e) {
413                        throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e);
414                }
415        }
416
417        protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() {
418
419                @Override
420                public void handle(FramsticksException exception) {
421                        interruptJoinable();
422                        // finish();
423                }
424        };
425
426        @Override
427        protected void receiverThreadRoutine() {
428                startClientConnection(this);
429
430                sendQueryVersion(1, new FutureHandler<Void>(closeOnFailure) {
431
432                        @Override
433                        protected void result(Void result) {
434                                synchronized (applicationRequestsBuffer) {
435                                        isHandshakeDone = true;
436                                        for (Runnable r : applicationRequestsBuffer) {
437                                                r.run();
438                                        }
439                                        applicationRequestsBuffer.clear();
440                                }
441                        }
442                });
443
444                processInputBatchesUntilClosed();
445        }
446
447        protected void processNextInputBatch() {
448                processMessageStartingWith(getLine());
449        }
450
451        protected final Map<String, EventListener<File>> registeredListeners = new HashMap<>();
452
453        public <C> void addListener(String path, final EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
454                send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) {
455                        @Override
456                        protected void processOk(Response response) {
457                                synchronized (registeredListeners) {
458                                        registeredListeners.put(Path.validateString(response.getComment()), listener);
459                                }
460                                future.pass(null);
461                        }
462                });
463        }
464
465        public <C> void removeListener(EventListener<File> listener, final Dispatcher<C> dispatcher, final Future<Void> future) {
466                String eventPath = null;
467                synchronized (registeredListeners) {
468                        for (Map.Entry<String, EventListener<File>> e : registeredListeners.entrySet()) {
469                                if (e.getValue() == listener) {
470                                        eventPath = e.getKey();
471                                        break;
472                                }
473                        }
474                }
475                if (eventPath == null) {
476                        future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener));
477                        return;
478                }
479
480                final String finalEventPath = eventPath;
481                //TODO add arguments to the exception
482                send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) {
483
484                        @Override
485                        protected void processOk(Response response) {
486                                synchronized (registeredListeners) {
487                                        registeredListeners.remove(finalEventPath);
488                                }
489                                future.pass(null);
490                        }
491                });
492        }
493}
Note: See TracBrowser for help on using the repository browser.