package com.framsticks.communication; import com.framsticks.communication.queries.ApplicationRequest; import com.framsticks.communication.queries.CallRequest; import com.framsticks.communication.queries.NeedFile; import com.framsticks.communication.queries.NeedFileAcceptor; import com.framsticks.communication.queries.ProtocolRequest; import com.framsticks.communication.queries.RegisterRequest; import com.framsticks.communication.queries.UseRequest; import com.framsticks.communication.queries.VersionRequest; import com.framsticks.core.Path; import com.framsticks.params.ListSource; import com.framsticks.util.*; import com.framsticks.util.dispatching.AtOnceDispatcher; import com.framsticks.util.dispatching.Dispatcher; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.dispatching.ExceptionResultHandler; import com.framsticks.util.dispatching.Future; import com.framsticks.util.dispatching.FutureHandler; import com.framsticks.util.dispatching.JoinableState; import com.framsticks.util.lang.Casting; import com.framsticks.util.lang.Pair; import com.framsticks.util.lang.Strings; import com.framsticks.params.EventListener; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import java.util.*; import java.util.regex.Matcher; import javax.annotation.Nonnull; import javax.annotation.Nullable; import com.framsticks.util.dispatching.RunAt; /** * @author Piotr Sniegowski */ public class ClientSideManagedConnection extends ManagedConnection { private final static Logger log = LogManager.getLogger(ClientSideManagedConnection.class); private final List applicationRequestsBuffer = new LinkedList<>(); private boolean isHandshakeDone = false; protected NeedFileAcceptor needFileAcceptor; /** * @return the needFileAcceptor */ public NeedFileAcceptor getNeedFileAcceptor() { return needFileAcceptor; } /** * @param needFileAcceptor the needFileAcceptor to set */ public void setNeedFileAcceptor(NeedFileAcceptor needFileAcceptor) { this.needFileAcceptor = needFileAcceptor; } /** * @return the requestedVersion */ public int getRequestedVersion() { return requestedVersion; } /** * @param requestedVersion the requestedVersion to set */ public void setRequestedVersion(int requestedVersion) { this.requestedVersion = requestedVersion; } protected int requestedVersion = 4; public ClientSideManagedConnection() { setDescription("client connection"); protocolVersion = -1; requestedFeatures.add("request_id"); requestedFeatures.add("call_empty_result"); requestedFeatures.add("needfile_id"); } protected List readFileContent() { List content = new LinkedList(); String line; boolean longValue = false; while (true) { line = getLine(); if (longValue) { if (line.endsWith("~") && !line.endsWith("\\~")) { longValue = false; } } else { if (line.equals("eof")) { break; } if (line.endsWith(":~")) { longValue = true; } } content.add(line); } return content; } private static class SentQuery { Request request; ClientSideResponseFuture callback; Dispatcher dispatcher; protected final List files = new ArrayList(); public List getFiles() { return files; } @Override public String toString() { return request.toString(); } public void dispatchResponseProcess(final Response response) { Dispatching.dispatchIfNotActive(dispatcher, new RunAt(callback) { @Override protected void runAt() { callback.pass(response); } }); } } public void send(ProtocolRequest request, ClientSideResponseFuture callback) { sendImplementation(request, AtOnceDispatcher.getInstance(), callback); } public void send(final ApplicationRequest request, final Dispatcher dispatcher, final ClientSideResponseFuture callback) { synchronized (applicationRequestsBuffer) { if (!isHandshakeDone) { applicationRequestsBuffer.add(new Runnable() { @Override public void run() { sendImplementation(request, dispatcher, callback); } }); return; } } sendImplementation(request, dispatcher, callback); } private void sendImplementation(Request request, Dispatcher dispatcher, ClientSideResponseFuture callback) { callback.setRequest(request); if (getState().ordinal() > JoinableState.RUNNING.ordinal()) { throw new FramsticksException().msg("connection is not connected").arg("connection", this); } final SentQuery sentQuery = new SentQuery(); sentQuery.request = request; sentQuery.callback = callback; sentQuery.dispatcher = dispatcher; senderThread.dispatch(new RunAt(callback) { @Override protected void runAt() { Integer id = sentQueries.put(null, sentQuery); String command = sentQuery.request.getCommand(); StringBuilder message = new StringBuilder(); message.append(command); if (id != null) { message.append(" ").append(id); } message.append(" "); sentQuery.request.construct(message); String out = message.toString(); putLine(out); flushOut(); log.debug("sending query: {}", out); } }); } @Override public String toString() { return "client connection " + address; } private void sendNextUseRequest(final Iterator featuresIterator, final Future future) { if (!featuresIterator.hasNext()) { future.pass(null); return; } final String feature = featuresIterator.next(); send(new UseRequest().feature(feature), new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { if (feature.equals("request_id")) { requestIdEnabled = true; } sendNextUseRequest(featuresIterator, future); } }); } private void sendQueryVersion(final int version, final Future future) { send(new VersionRequest().version(version), new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { protocolVersion = version; if (version < requestedVersion) { /** it is an implicit loop here*/ sendQueryVersion(version + 1, future); return; } sendNextUseRequest(requestedFeatures.iterator(), future); } }); } protected class IdCollection { protected final Map map = new HashMap<>(); protected T current; public Integer put(Integer idProposition, T value) { synchronized (ClientSideManagedConnection.this) { while (!(requestIdEnabled || current == null)) { try { ClientSideManagedConnection.this.wait(); } catch (InterruptedException ignored) { break; } } if (!requestIdEnabled) { current = value; return null; } if (idProposition == null) { idProposition = nextQueryId++; } map.put(idProposition, value); return idProposition; } } public void clear(Integer id) { if (requestIdEnabled) { current = null; } else { map.remove(id); } } public @Nonnull T fetch(@Nullable Integer id, boolean remove) { synchronized (ClientSideManagedConnection.this) { try { if (id == null) { if (requestIdEnabled) { throw new FramsticksException().msg("request_id is enabled and id is missing"); } T result = current; current = null; ClientSideManagedConnection.this.notifyAll(); return result; } if (!map.containsKey(id)) { throw new FramsticksException().msg("id is unknown").arg("id", id); } T result = map.get(id); if (remove) { map.remove(id); } return result; } catch (FramsticksException e) { throw new FramsticksException().msg("failed to match response to sent query").cause(e); } } } } protected IdCollection> sentQueries = new IdCollection<>(); protected IdCollection needFiles = new IdCollection<>(); private int nextQueryId = 0; protected void processEvent(String rest) { Matcher matcher = Request.EVENT_PATTERN.matcher(rest); if (!matcher.matches()) { throw new FramsticksException().msg("invalid event line").arg("rest", rest); } String fileLine = getLine(); if (!fileLine.equals("file")) { throw new FramsticksException().msg("expected file line").arg("got", fileLine); } String eventObjectPath = Strings.takeGroup(rest, matcher, 1).toString(); String eventCalleePath = Strings.takeGroup(rest, matcher, 2).toString(); final File file = new File("", new ListSource(readFileContent())); log.debug("firing event {}", eventObjectPath); EventListener listener; synchronized (registeredListeners) { listener = registeredListeners.get(eventObjectPath); } if (listener == null) { throw new FramsticksException().msg("failed to find registered event").arg("event path", eventObjectPath).arg("object", eventCalleePath); } listener.action(file); } protected void processNeedFile(Pair rest) { final Integer id = rest.first; String suggestedName = null; String description = null; Pair s = Request.takeString(rest.second); if (s != null) { suggestedName = s.first.toString(); Pair d = Request.takeString(s.second); if (d != null) { description = d.first.toString(); } } final Future future = new Future() { protected void send(final File result) { log.debug("sending file: " + result); needFiles.clear(id); sendFile(null, result, id, ClientSideManagedConnection.this); } @Override protected void result(File result) { send(result); } @Override public void handle(FramsticksException exception) { send(new File("", ListSource.createFrom("# invalid", "# " + exception.getMessage()))); } }; NeedFile needFile = new NeedFile(suggestedName, description, future); if (needFileAcceptor.acceptNeed(needFile)) { return; } future.handle(new FramsticksException().msg("acceptor did not accepted need")); } protected void processFile(Pair rest) { final SentQuery sentQuery = sentQueries.fetch(rest.first, false); String currentFilePath = rest.second.toString(); if (!Strings.notEmpty(currentFilePath)) { currentFilePath = Casting.throwCast(ApplicationRequest.class, sentQuery.request).getPath(); } sentQuery.files.add(new File(currentFilePath, new ListSource(readFileContent()))); } protected void processMessageStartingWith(final String header) { try { final Pair command = Request.takeIdentifier(header); if (command == null) { throw new FramsticksException().msg("failed to parse command"); } final CharSequence keyword = command.first; if (keyword.equals("event")) { processEvent(command.second.toString()); return; } final Pair rest = takeRequestId(command.second); if (rest == null) { throw new FramsticksException().msg("failed to parse optional id and remainder"); } if (keyword.equals("file")) { processFile(rest); return; } if (keyword.equals("ok") || keyword.equals("error")) { final SentQuery sentQuery = sentQueries.fetch(rest.first, true); log.debug("parsing response for request {}", sentQuery); sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second.toString(), sentQuery.getFiles())); return; } if (keyword.equals("needfile")) { processNeedFile(rest); return; } throw new FramsticksException().msg("unknown command keyword").arg("keyword", keyword); } catch (FramsticksException e) { throw new FramsticksException().msg("failed to process message").arg("starting with line", header).cause(e); } } protected final ExceptionResultHandler closeOnFailure = new ExceptionResultHandler() { @Override public void handle(FramsticksException exception) { interruptJoinable(); // finish(); } }; @Override protected void receiverThreadRoutine() { startClientConnection(this); sendQueryVersion(1, new FutureHandler(closeOnFailure) { @Override protected void result(Void result) { synchronized (applicationRequestsBuffer) { isHandshakeDone = true; for (Runnable r : applicationRequestsBuffer) { r.run(); } applicationRequestsBuffer.clear(); } } }); processInputBatchesUntilClosed(); } protected void processNextInputBatch() { processMessageStartingWith(getLine()); } protected final Map> registeredListeners = new HashMap<>(); public void addListener(String path, final EventListener listener, final Dispatcher dispatcher, final Future future) { send(new RegisterRequest().path(path), dispatcher, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { synchronized (registeredListeners) { registeredListeners.put(Path.validateString(response.getComment()), listener); } future.pass(null); } }); } public void removeListener(EventListener listener, final Dispatcher dispatcher, final Future future) { String eventPath = null; synchronized (registeredListeners) { for (Map.Entry> e : registeredListeners.entrySet()) { if (e.getValue() == listener) { eventPath = e.getKey(); break; } } } if (eventPath == null) { future.handle(new FramsticksException().msg("listener is not registered").arg("listener", listener)); return; } final String finalEventPath = eventPath; //TODO add arguments to the exception send(new CallRequest().procedure("remove").path(eventPath), dispatcher, new ClientSideResponseFuture(future) { @Override protected void processOk(Response response) { synchronized (registeredListeners) { registeredListeners.remove(finalEventPath); } future.pass(null); } }); } }