package com.framsticks.communication; import com.framsticks.communication.queries.*; import com.framsticks.util.FramsticksException; import com.framsticks.util.lang.Holder; import com.framsticks.util.lang.Pair; import com.framsticks.util.lang.Strings; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import java.net.Socket; import com.framsticks.util.dispatching.RunAt; /** * @author Piotr Sniegowski */ public class ServerSideManagedConnection extends ManagedConnection { private final static Logger log = LogManager.getLogger(ServerSideManagedConnection.class); protected final RequestHandler requestHandler; public ServerSideManagedConnection(Socket socket, RequestHandler requestHandler) { setAddress(new Address(socket.getInetAddress().getHostAddress(), socket.getPort())); setDescription("server connection"); this.socket = socket; this.requestHandler = requestHandler; // socket.setSoTimeout(500); setupStreams(); } @Override protected void receiverThreadRoutine() { processInputBatchesUntilClosed(); } protected void handleRequest(Request request, ServerSideResponseFuture responseCallback) { if (request instanceof ApplicationRequest) { requestHandler.handle((ApplicationRequest) request, responseCallback); return; } if (request instanceof ProtocolRequest) { if (request instanceof VersionRequest) { responseCallback.pass(new Response(true, null)); return; } if (request instanceof UseRequest) { String feature = ((UseRequest)request).getFeature(); if (feature.equals("request_id")) { requestIdEnabled = true; responseCallback.pass(new Response(true, null)); return; } // if (feature.equals("call_empty_result")) { // responseCallback.pass(new Response(true, null)); // return; // } if (feature.equals("needfile_id")) { responseCallback.pass(new Response(true, null)); return; } responseCallback.pass(new Response(false, "unknown feature: " + feature)); return; } } log.error("unhandled request: {}", request); responseCallback.pass(new Response(false, "unhandled")); } protected final void respond(final Response response, final Integer id) { senderThread.dispatch(new RunAt(requestHandler) { @Override protected void runAt() { if (response.getFiles() != null) { for (File f : response.getFiles()) { putFile(f, id); } } StringBuilder statusLine = new StringBuilder(); statusLine.append(response.getOk() ? "ok" : "error").append(idToString(id)); if (Strings.notEmpty(response.getComment())) { Request.quoteValue(statusLine.append(" "), response.getComment()); } putLine(statusLine.toString()); flushOut(); } }); } protected void processNextInputBatch() { final Holder id = new Holder<>(); final String line = getLine(); try { Pair command = Request.takeIdentifier(line); final Pair rest = takeRequestId(command.second); id.set(rest.first); final Request request = Request.parse(command.first, rest.second); if (log.isTraceEnabled()) { log.trace("read request: {}", request); } handleRequest(request, new ServerSideResponseFuture() { @Override protected void result(Response response) { respond(response, rest.first); } }); } catch (FramsticksException e) { e.arg("id", id.get()).arg("line", line); log.error("error: ", e); respond(new Response(false, "invalid input: " + e.getMsg()), id.get()); return; } } }