source: java/main/src/main/java/com/framsticks/remote/RemoteInstance.java @ 77

Last change on this file since 77 was 77, checked in by psniegowski, 11 years ago

Add new java codebase.

File size: 17.3 KB
Line 
1package com.framsticks.remote;
2
3import com.framsticks.communication.*;
4import com.framsticks.communication.queries.GetRequest;
5import com.framsticks.communication.queries.InfoRequest;
6import com.framsticks.communication.queries.SetRequest;
7import com.framsticks.communication.util.LoggingSubscriptionCallback;
8import com.framsticks.core.ListChange;
9import com.framsticks.core.Parameters;
10import com.framsticks.core.Path;
11import com.framsticks.params.*;
12import com.framsticks.params.types.CompositeParam;
13import com.framsticks.params.types.EventParam;
14import com.framsticks.parsers.MultiParamLoader;
15import com.framsticks.core.Instance;
16import com.framsticks.util.*;
17import org.apache.log4j.Logger;
18
19import java.util.*;
20
21/**
22 * @author Piotr Sniegowski
23 */
24public class RemoteInstance extends Instance {
25
26        private final static Logger LOGGER = Logger.getLogger(RemoteInstance.class.getName());
27
28    protected Path simulator;
29    protected final ClientConnection connection;
30
31    protected final Set<Pair<Path, Subscription>> subscriptions = new HashSet<Pair<Path, Subscription>>();
32
33    public Pair<Path, Subscription> getSubscription(Path path) {
34        for (Pair<Path, Subscription> s : subscriptions) {
35            if (s.first.matches(path)) {
36                return s;
37            }
38        }
39        return null;
40    }
41
42    @Override
43    public void run() {
44        assert isActive();
45        super.run();
46        connection.connect(new StateFunctor() {
47                        @Override
48                        public void call(Exception e) {
49                if (e != null) {
50                    fireRun(e);
51                    return;
52                }
53                                connection.negotiateProtocolVersion(new StateFunctor() {
54                                        @Override
55                                        public void call(Exception e) {
56                                                if (e != null) {
57                                                        LOGGER.fatal("unsupported protocol version!\n minimal version is: "
58                                                                        + "nmanager protocol is: "
59                                                                        + connection.getProtocolVersion());
60                                                        connection.close();
61                            fireRun(e);
62                                                        return;
63                                                }
64
65                        invokeLater(new Runnable() {
66                            @Override
67                            public void run() {
68                                resolveAndFetch("/simulator", new Future<Path>() {
69                                    @Override
70                                    public void result(Path path, Exception e) {
71                                        if (e != null) {
72                                            LOGGER.fatal("failed to resolve simulator node");
73                                            fireRun(e);
74                                            return;
75                                        }
76                                        assert isActive();
77                                        simulator = path;
78                                        fireRun(null);
79                                        LOGGER.info("resolved simulator node");
80
81                                        EventParam param = getParam(simulator, "running_changed", EventParam.class);
82                                        assert param != null;
83                                        connection.subscribe(simulator.getTextual() + "/" + param.getId(), new LoggingSubscriptionCallback(LOGGER, "server running state change", new EventCallback() {
84                                            @Override
85                                            public void call(List<File> files) {
86                                                invokeLater(new Runnable() {
87                                                    @Override
88                                                    public void run() {
89                                                        updateSimulationRunning();
90                                                    }
91                                                });
92                                            }
93                                        }));
94                                        new PeriodicTask(RemoteInstance.this, 1000) {
95                                            @Override
96                                            public void run() {
97                                                updateSimulationRunning();
98                                                again();
99                                            }
100                                        };
101                                    }
102                                });
103                            }
104                        });
105                                        }
106                                });
107                        }
108                });
109        }
110
111        public RemoteInstance(Parameters parameters) {
112        super(parameters);
113        connection = new ClientConnection(config.getString("address"));
114    }
115
116        @Override
117        public String toString() {
118        assert Dispatching.isThreadSafe();
119                return getConnection().getAddress();
120        }
121
122        public void setRunning(final boolean running) {
123        assert isActive();
124        //simulator.call(simulator.getParam(running ? "start" : "stop", ProcedureParam.class), new LoggingStateCallback(LOGGER, (running ? "starting" : "stopping") + " server"));
125        }
126
127        protected final UnaryListenersSet<Boolean> simulationRunningListeners = new UnaryListenersSet<Boolean>();
128
129        protected void updateSimulationRunning() {
130        assert isActive();
131        /*
132        fetchValue(simulator, getParam(simulator, "running", Param.class), new StateFunctor() {
133            @Override
134            public void call(Exception e) {
135                if (e != null) {
136                    LOGGER.fatal("failed to query simulator running status: " + e);
137                    return;
138                }
139
140                invokeLater(new Runnable() {
141                    @Override
142                    public void run() {
143                        boolean value = bindAccess(simulator).get("running", Boolean.class);
144                        LOGGER.trace("server running: " + value);
145                        simulationRunningListeners.call(value);
146                    }
147                });
148
149            }
150        });
151        */
152        }
153
154        public void addRunningStateListener(UnaryFunctor<Boolean, Boolean> listener) {
155        assert isActive();
156        simulationRunningListeners.add(listener);
157        }
158
159        public void disconnect() {
160        assert isActive();
161        if (connection.isConnected()) {
162                        connection.close();
163                }
164        }
165
166    public final ClientConnection getConnection() {
167        return connection;
168    }
169
170    @Override
171    public void fetchValue(final Path path, final Param param, final StateFunctor stateFunctor) {
172        assert isActive();
173        assert param != null;
174        assert path.isResolved();
175        connection.send(new GetRequest().setField(param.getId()).setPath(path.getTextual()), this, new ResponseCallback() {
176            @Override
177            public void process(Response response) {
178                assert isActive();
179                if (!response.getOk()) {
180                    stateFunctor.call(new Exception(response.getComment()));
181                    return;
182                }
183                try {
184                    processFetchedValues(path, response.getFiles());
185                    stateFunctor.call(null);
186                } catch (Exception ex) {
187                    stateFunctor.call(ex);
188                }
189            }
190        });
191    }
192
193    protected final Map<String, Set<Future<FramsClass>>> infoRequests = new HashMap<String, Set<Future<FramsClass>>>();
194
195    protected void finishInfoRequest(String id, FramsClass result, Exception e) {
196        assert isActive();
197        Set<Future<FramsClass>> futures = infoRequests.get(id);
198        infoRequests.remove(id);
199        for (Future<FramsClass> f : futures) {
200            f.result(result, e);
201        }
202    }
203
204    @Override
205    protected void fetchInfo(final Path path, final Future<FramsClass> future) {
206
207        final String name = path.getTop().getParam().getContainedTypeName();
208
209        if (infoRequests.containsKey(name)) {
210            infoRequests.get(name).add(future);
211            return;
212        }
213
214        LOGGER.debug("issuing info request for " + name);
215        Set<Future<FramsClass>> futures = new HashSet<Future<FramsClass>>();
216        futures.add(future);
217        infoRequests.put(name, futures);
218
219        //TODO: if the info is in the cache, then don't communicate
220        connection.send(new InfoRequest().setPath(path.getTextual()), this, new ResponseCallback() {
221            @Override
222            public void process(Response response) {
223                assert isActive();
224                if (!response.getOk()) {
225                    finishInfoRequest(name, null, new Exception(response.getComment()));
226                    return;
227                }
228
229                assert response.getFiles().size() == 1;
230                assert path.isTheSame(response.getFiles().get(0).getPath());
231                FramsClass framsClass = processFetchedInfo(response.getFiles().get(0));
232
233                if (framsClass == null) {
234                    LOGGER.fatal("could not read class info");
235                    finishInfoRequest(name, null, new Exception("could not read class info"));
236                    return;
237                }
238                CompositeParam thisParam = path.getTop().getParam();
239                if (!thisParam.isMatchingContainedName(framsClass.getId())) {
240                    String mismatch = "class name mismatch: param=" + thisParam.getContainedTypeName() + " differs from fetched=" + framsClass.getId();
241                    LOGGER.error(mismatch);
242                    finishInfoRequest(name, null, new Exception(mismatch));
243                    return;
244                }
245                finishInfoRequest(name, framsClass, null);
246            }
247        });
248    }
249
250    @Override
251    public void fetchValues(final Path path, final StateFunctor stateFunctor) {
252        assert isActive();
253        assert path.getTop().getObject() != null;
254
255        LOGGER.trace("fetching values for " + path);
256        connection.send(new GetRequest().setPath(path.getTextual()), this, new ResponseCallback() {
257            @Override
258            public void process(Response response) {
259                assert isActive();
260                if (!response.getOk()) {
261                    stateFunctor.call(new Exception(response.getComment()));
262                    return;
263                }
264                try {
265                    processFetchedValues(path, response.getFiles());
266                    stateFunctor.call(null);
267                } catch (Exception ex) {
268                    LOGGER.error("an exception occurred while loading: " + ex);
269                    ex.printStackTrace();
270                    stateFunctor.call(ex);
271                }
272            }
273        });
274    }
275
276    @Override
277    public void resolve(final Path path, final Future<Path> future) {
278        assert isActive();
279        if (path.getTop().getObject() != null) {
280            if (getInfoFromCache(path) != null) {
281                future.result(path, null);
282                return;
283            }
284            findInfo(path, new Future<FramsClass>() {
285                @Override
286                public void result(FramsClass result, Exception e) {
287                    if (e != null) {
288                        future.result(null, e);
289                        return;
290                    }
291                    future.result(path, null);
292                }
293            });
294            return;
295        }
296        findInfo(path, new Future<FramsClass>() {
297            @Override
298            public void result(FramsClass result, Exception e) {
299                assert isActive();
300                if (e != null) {
301                    future.result(null, e);
302                    return;
303                }
304                assert path.getTop().getParam().isMatchingContainedName(result.getId());
305                Path p = (path.getTop().getParam().getContainedTypeName() != null ? path : new Path(path.getInstance(), path.getTextual()));
306                future.result(create(p), null);
307            }
308        });
309    }
310
311    @Override
312    protected void tryRegisterOnChangeEvents(final Path path) {
313        assert isActive();
314        AccessInterface access = bindAccess(path);
315        if (!(access instanceof ListAccess)) {
316            return;
317        }
318
319
320        assert path.size() >= 2;
321        FramsClass underFramsClass = getInfoFromCache(path.getUnder().getParam().getContainedTypeName());
322
323        EventParam changedEvent = Casting.tryCast(EventParam.class, underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed"));
324        if (changedEvent == null) {
325            return;
326        }
327
328        if (getSubscription(path) != null) {
329            return;
330        }
331
332        final Pair<Path, Subscription> temporary = new Pair<Path, Subscription>(path, null);
333        subscriptions.add(temporary);
334
335        connection.subscribe(path.getTextual() + "_changed", new SubscriptionCallback() {
336            @Override
337            public EventCallback subscribed(final Subscription subscription) {
338                if (subscription == null) {
339                    LOGGER.error("failed to subscribe for change event for " + path);
340                    return null;
341                }
342                LOGGER.debug("subscribed for change event for " + path);
343                subscription.setDispatcher(RemoteInstance.this);
344                RemoteInstance.this.invokeLater(new Runnable() {
345                    @Override
346                    public void run() {
347                        subscriptions.remove(temporary);
348                        subscriptions.add(new Pair<Path, Subscription>(path, subscription));
349                    }
350                });
351                return new EventCallback() {
352                    @Override
353                    public void call(List<File> files) {
354                        assert isActive();
355                        assert files.size() == 1;
356                        MultiParamLoader loader = new MultiParamLoader();
357                        loader.setNewSource(files.get(0).getContent());
358                        loader.addBreakCondition(MultiParamLoader.Status.AfterObject);
359                        ReflectionAccess access = new ReflectionAccess(ListChange.class, ListChange.getFramsClass());
360                        loader.addAccessInterface(access);
361                        MultiParamLoader.Status status;
362                        try {
363                            while ((status = loader.go()) != MultiParamLoader.Status.Finished) {
364                                if (status == MultiParamLoader.Status.AfterObject) {
365                                    AccessInterface accessInterface = loader.getLastAccessInterface();
366                                    reactToChange(path, (ListChange) accessInterface.getSelected());
367                                }
368                            }
369                        } catch (Exception e) {
370                            e.printStackTrace();
371                        }
372                    }
373                };
374            }
375        });
376    }
377
378
379    protected void reactToChange(final Path path, final ListChange listChange) {
380        assert isActive();
381        LOGGER.debug("reacting to change " + listChange + " in " + path);
382                AccessInterface access = bindAccess(path);
383                assert access != null;
384
385                if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) {
386                        final String p = path.getTextual();
387                        resolveAndFetch(p, new Future<Path>() {
388                                @Override
389                                public void result(Path result, Exception e) {
390                                        if (e != null) {
391                                                LOGGER.error("failed to modify " + p + ": " + e);
392                                                return;
393                                        }
394                                        fireListChange(path, listChange);
395                                }
396                        });
397                        return;
398                }
399
400
401        CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier()));
402        assert childParam != null;
403        switch (listChange.getAction()) {
404            case Add: {
405                final String p = path.getTextual() + "/" + childParam.getId();
406                resolveAndFetch(p, new Future<Path>() {
407                    @Override
408                    public void result(Path result, Exception e) {
409                        if (e != null) {
410                            LOGGER.error("failed to add " + p + ": " + e);
411                            return;
412                        }
413                        LOGGER.debug("added: " + result);
414                        fireListChange(path, listChange);
415                    }
416                });
417                break;
418            }
419            case Remove: {
420                access.set(childParam, null);
421                fireListChange(path, listChange);
422                break;
423            }
424            case Modify: {
425                final String p = path.getTextual() + "/" + childParam.getId();
426                resolveAndFetch(p, new Future<Path>() {
427                    @Override
428                    public void result(Path result, Exception e) {
429                        if (e != null) {
430                            LOGGER.error("failed to modify " + p + ": " + e);
431                            return;
432                        }
433                        fireListChange(path, listChange);
434                    }
435                });
436                break;
437            }
438        }
439    }
440
441        @Override
442        public void storeValue(final Path path, final Param param, final Object value, final StateFunctor stateFunctor) {
443                assert isActive();
444
445                LOGGER.trace("storing value " + param + " for " + path);
446                connection.send(new SetRequest().value(value.toString()).setField(param.getId()).setPath(path.getTextual()), this, new StateCallback() {
447                        @Override
448                        public void call(Exception e) {
449                                if (e == null) {
450                                        bindAccess(path).set(param, value);
451                                }
452                                stateFunctor.call(e);
453                        }
454                });
455        }
456}
Note: See TracBrowser for help on using the repository browser.