package com.framsticks.remote; import com.framsticks.communication.*; import com.framsticks.communication.queries.CallRequest; import com.framsticks.communication.queries.GetRequest; import com.framsticks.communication.queries.InfoRequest; import com.framsticks.communication.queries.SetRequest; import com.framsticks.core.AbstractTree; import com.framsticks.core.Mode; import com.framsticks.core.TreeOperations; import com.framsticks.core.ListChange; import com.framsticks.core.Node; import com.framsticks.core.Path; import com.framsticks.params.*; import com.framsticks.params.annotations.AutoAppendAnnotation; import com.framsticks.params.annotations.FramsClassAnnotation; import com.framsticks.params.annotations.ParamAnnotation; import com.framsticks.params.types.EventParam; import com.framsticks.params.types.ProcedureParam; import com.framsticks.parsers.MultiParamLoader; import com.framsticks.core.Tree; import com.framsticks.util.*; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.dispatching.Future; import com.framsticks.util.dispatching.FutureHandler; import com.framsticks.util.dispatching.Joinable; import com.framsticks.util.dispatching.JoinableParent; import com.framsticks.util.dispatching.JoinableState; import com.framsticks.util.lang.Casting; import com.framsticks.util.lang.Pair; import com.framsticks.util.dispatching.RunAt; import static com.framsticks.core.TreeOperations.*; import java.util.*; import javax.annotation.Nonnull; import org.apache.log4j.Logger; /** * @author Piotr Sniegowski */ @FramsClassAnnotation public class RemoteTree extends AbstractTree implements JoinableParent { private final static Logger log = Logger.getLogger(RemoteTree.class); protected ClientSideManagedConnection connection; protected final Set>> subscriptions = new HashSet<>(); public Pair> getSubscription(Path path) { for (Pair> s : subscriptions) { if (s.first.matches(path)) { return s; } } return null; } public RemoteTree() { } @ParamAnnotation public void setAddress(String address) { setConnection(Connection.to(new ClientSideManagedConnection(), new Address(address))); } @ParamAnnotation public String getAddress() { return connection == null ? "" : connection.getAddress().toString(); } @AutoAppendAnnotation public void setConnection(final ClientSideManagedConnection connection) { this.connection = connection; // final ExceptionResultHandler failure = new ExceptionResultHandler() { // @Override // public void handle(FramsticksException exception) { // log.fatal("failed to establish connection: ", exception); // // log.fatal("unsupported protocol version!\n minimal version is: " + "\nmanager protocol is: " + connection.getProtocolVersion()); // Dispatching.drop(connection, RemoteTree.this); // fireRun(exception); // } // }; } @Override public String toString() { assert Dispatching.isThreadSafe(); return "remote tree " + getName() + "(" + getAddress() + ")"; } public final ClientSideManagedConnection getConnection() { return connection; } @Override public void get(final Path path, final PrimitiveParam param, Mode mode, final Future future) { assert isActive(); assert param != null; assert path.isResolved(); //TODO only do that if needed connection.send(new GetRequest().field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { assert isActive(); TreeOperations.processFetchedValues(path, response.getFiles()); future.pass(bindAccess(path).get(param, Object.class)); } }); } protected final Map>> infoRequests = new HashMap>>(); @Override public void info(final Path path, final Future future) { final String name = path.getTop().getParam().getContainedTypeName(); if (infoRequests.containsKey(name)) { infoRequests.get(name).add(future); return; } log.debug("issuing info request for " + name); final Set> futures = new HashSet>(); futures.add(future); infoRequests.put(name, futures); final Future compositeFuture = new Future() { @Override public void handle(FramsticksException exception) { assert isActive(); infoRequests.remove(name); for (Future f : futures) { f.handle(exception); } } @Override protected void result(FramsClass framsClass) { assert isActive(); infoRequests.remove(name); for (Future f : futures) { f.pass(framsClass); } } }; //TODO: if the info is in the cache, then don't communicate connection.send(new InfoRequest().path(path.getTextual()), this, new ClientSideResponseFuture(compositeFuture) { @Override protected void processOk(Response response) { assert isActive(); if (response.getFiles().size() != 1) { throw new FramsticksException().msg("invalid number of files in response").arg("files", response.getFiles().size()); } if (!path.isTheSame(response.getFiles().get(0).getPath())) { throw new FramsticksException().msg("path mismatch").arg("returned path", response.getFiles().get(0).getPath()); } FramsClass framsClass = TreeOperations.processFetchedInfo(RemoteTree.this, response.getFiles().get(0)); CompositeParam thisParam = path.getTop().getParam(); if (!thisParam.isMatchingContainedName(framsClass.getId())) { throw new FramsticksException().msg("framsclass id mismatch").arg("request", thisParam.getContainedTypeName()).arg("fetched", framsClass.getId()); } compositeFuture.pass(framsClass); } }); } @Override public void get(final Path path, Mode mode, final Future future) { assert isActive(); assert path.getTop().getObject() != null; log.trace("fetching values for " + path); connection.send(new GetRequest().path(path.getTextual()), this, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { assert isActive(); TreeOperations.processFetchedValues(path, response.getFiles()); future.pass(path.getTopObject()); } }); } @Override public void resolve(final Path path, final Future future) { TreeOperations.resolve(path, future); } @Override protected void tryRegisterOnChangeEvents(final Path path) { assert isActive(); AccessInterface access = TreeOperations.bindAccess(path); if (!(access instanceof ListAccess)) { return; } assert path.size() >= 2; FramsClass underFramsClass = getInfoFromCache(path.getUnder().getParam().getContainedTypeName()); EventParam changedEvent; try { changedEvent = underFramsClass.getParamEntry(path.getTop().getParam().getId() + "_changed", EventParam.class); } catch (FramsticksException e) { return; } log.debug("registering for " + changedEvent); if (getSubscription(path) != null) { return; } final Pair> temporary = new Pair<>(path, null); subscriptions.add(temporary); connection.subscribe(path.getTextual() + "_changed", this, new SubscriptionCallback() { @Override public EventCallback subscribed(final Subscription subscription) { if (subscription == null) { log.error("failed to subscribe for change event for " + path); return null; } log.debug("subscribed for change event for " + path); // subscription.setDispatcher(RemoteInstance.this); RemoteTree.this.dispatch(new RunAt(this) { @Override protected void runAt() { subscriptions.remove(temporary); subscriptions.add(new Pair>(path, subscription)); } }); return new EventCallback() { @Override public void call(List files) { assert isActive(); assert files.size() == 1; final MultiParamLoader loader = new MultiParamLoader(); loader.setNewSource(files.get(0).getContent()); loader.addBreakCondition(MultiParamLoader.Status.AfterObject); loader.addListener(MultiParamLoader.Status.OnComment, new MultiParamLoader.StatusListener() { @Override public void onStatusChange() { throw new FramsticksException().msg("multi param loader error").arg("line", loader.getCurrentLine()); } }); ReflectionAccess access = new ReflectionAccess(ListChange.class, FramsClass.build().forClass(ListChange.class)); loader.addAccessInterface(access); MultiParamLoader.Status status; while ((status = loader.go()) != MultiParamLoader.Status.Finished) { if (status == MultiParamLoader.Status.AfterObject) { AccessInterface accessInterface = loader.getLastAccessInterface(); reactToChange(path, (ListChange) accessInterface.getSelected()); } } } }; } }); } protected Future futureListChanger(final ListChange listChange, final String path) { return new FutureHandler(Logging.logger(log, "failed to " + listChange, path)) { @Override protected void result(Path result) { log.debug(listChange + ": " + result); fireListChange(result, listChange); } }; } protected void reactToChange(final Path path, final ListChange listChange) { assert isActive(); log.debug("reacting to change " + listChange + " in " + path); AccessInterface access = TreeOperations.bindAccess(path); assert access != null; if ((listChange.getAction() == ListChange.Action.Modify) && (listChange.getPosition() == -1)) { final String p = path.getTextual(); TreeOperations.resolveAndGet(this, p, futureListChanger(listChange, p)); return; } CompositeParam childParam = Casting.tryCast(CompositeParam.class, access.getParam(listChange.getBestIdentifier())); assert childParam != null; switch (listChange.getAction()) { case Add: { final String p = path.getTextual() + "/" + childParam.getId(); TreeOperations.resolveAndGet(this, p, futureListChanger(listChange, p)); break; } case Remove: { access.set(childParam, null); fireListChange(path, listChange); break; } case Modify: { final String p = path.getTextual() + "/" + childParam.getId(); TreeOperations.resolveAndGet(this, p, futureListChanger(listChange, p)); break; } } } @Override public void set(final Path path, final PrimitiveParam param, final Object value, final Future future) { assert isActive(); final Integer flag = TreeOperations.bindAccess(path).set(param, value); log.trace("storing value " + param + " for " + path); //TODO break in passing exception handler is here connection.send(new SetRequest().value(value.toString()).field(param.getId()).path(path.getTextual()), this, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { future.pass(flag); } }); } @Override protected void joinableStart() { Dispatching.use(connection, this); super.joinableStart(); } @Override protected void joinableInterrupt() { Dispatching.drop(connection, this); super.joinableInterrupt(); } @Override protected void joinableFinish() { super.joinableFinish(); } @Override public void joinableJoin() throws InterruptedException { Dispatching.join(connection); super.joinableJoin(); } @Override public void childChangedState(Joinable joinable, JoinableState state) { proceedToState(state); } @Override public void call(@Nonnull final Path path, @Nonnull final ProcedureParam procedure, @Nonnull Object[] arguments, final Future future) { assert isActive(); assert path.isResolved(); //TODO validate arguments type using params connection.send(new CallRequest().procedure(procedure.getId()).arguments(Arrays.asList(arguments)).path(path.getTextual()), this, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { assert isActive(); // InstanceUtils.processFetchedValues(path, response.getFiles()); future.pass(null); } }); } @Override public Path create(Path path) { assert isActive(); assert !path.isResolved(); Path resolved = path.tryFindResolution(); if (!resolved.isResolved()) { log.debug("creating: " + path); AccessInterface access = registry.prepareAccess(path.getTop().getParam()); assert access != null; Object child = access.createAccessee(); assert child != null; if (path.size() == 1) { setRoot(new Node(getRoot().getParam(), child)); } else { bindAccess(this, path.getUnder()).set(path.getTop().getParam(), child); } resolved = path.appendResolution(child); } tryRegisterOnChangeEvents(resolved); return resolved; } }