[97] | 1 | package com.framsticks.communication; |
---|
| 2 | |
---|
| 3 | import com.framsticks.communication.queries.*; |
---|
| 4 | import com.framsticks.util.FramsticksException; |
---|
| 5 | import com.framsticks.util.lang.Holder; |
---|
| 6 | import com.framsticks.util.lang.Pair; |
---|
| 7 | import com.framsticks.util.lang.Strings; |
---|
| 8 | |
---|
[100] | 9 | import org.apache.logging.log4j.Logger; |
---|
| 10 | import org.apache.logging.log4j.LogManager; |
---|
[97] | 11 | |
---|
| 12 | import java.net.Socket; |
---|
| 13 | import com.framsticks.util.dispatching.RunAt; |
---|
| 14 | |
---|
| 15 | /** |
---|
| 16 | * @author Piotr Sniegowski |
---|
| 17 | */ |
---|
| 18 | public class ServerSideManagedConnection extends ManagedConnection { |
---|
| 19 | |
---|
[100] | 20 | private final static Logger log = LogManager.getLogger(ServerSideManagedConnection.class); |
---|
[97] | 21 | |
---|
[98] | 22 | protected final RequestHandler requestHandler; |
---|
[97] | 23 | |
---|
| 24 | public ServerSideManagedConnection(Socket socket, RequestHandler requestHandler) { |
---|
| 25 | setAddress(new Address(socket.getInetAddress().getHostAddress(), socket.getPort())); |
---|
| 26 | setDescription("server connection"); |
---|
| 27 | this.socket = socket; |
---|
| 28 | this.requestHandler = requestHandler; |
---|
| 29 | // socket.setSoTimeout(500); |
---|
| 30 | setupStreams(); |
---|
| 31 | } |
---|
| 32 | |
---|
| 33 | |
---|
| 34 | |
---|
| 35 | @Override |
---|
| 36 | protected void receiverThreadRoutine() { |
---|
| 37 | |
---|
| 38 | processInputBatchesUntilClosed(); |
---|
| 39 | } |
---|
| 40 | |
---|
| 41 | protected void handleRequest(Request request, ServerSideResponseFuture responseCallback) { |
---|
| 42 | if (request instanceof ApplicationRequest) { |
---|
| 43 | requestHandler.handle((ApplicationRequest) request, responseCallback); |
---|
| 44 | return; |
---|
| 45 | } |
---|
| 46 | if (request instanceof ProtocolRequest) { |
---|
| 47 | if (request instanceof VersionRequest) { |
---|
[105] | 48 | responseCallback.pass(new Response(true, null)); |
---|
[97] | 49 | return; |
---|
| 50 | } |
---|
| 51 | if (request instanceof UseRequest) { |
---|
| 52 | String feature = ((UseRequest)request).getFeature(); |
---|
| 53 | if (feature.equals("request_id")) { |
---|
| 54 | requestIdEnabled = true; |
---|
[105] | 55 | responseCallback.pass(new Response(true, null)); |
---|
[97] | 56 | return; |
---|
| 57 | } |
---|
[107] | 58 | // if (feature.equals("call_empty_result")) { |
---|
| 59 | // responseCallback.pass(new Response(true, null)); |
---|
| 60 | // return; |
---|
| 61 | // } |
---|
[103] | 62 | if (feature.equals("needfile_id")) { |
---|
[105] | 63 | responseCallback.pass(new Response(true, null)); |
---|
[103] | 64 | return; |
---|
| 65 | } |
---|
[105] | 66 | responseCallback.pass(new Response(false, "unknown feature: " + feature)); |
---|
[97] | 67 | return; |
---|
| 68 | } |
---|
| 69 | |
---|
| 70 | } |
---|
[100] | 71 | log.error("unhandled request: {}", request); |
---|
[105] | 72 | responseCallback.pass(new Response(false, "unhandled")); |
---|
[97] | 73 | } |
---|
| 74 | |
---|
[99] | 75 | |
---|
| 76 | |
---|
[97] | 77 | protected final void respond(final Response response, final Integer id) { |
---|
[98] | 78 | senderThread.dispatch(new RunAt<Connection>(requestHandler) { |
---|
[97] | 79 | @Override |
---|
| 80 | protected void runAt() { |
---|
| 81 | if (response.getFiles() != null) { |
---|
| 82 | for (File f : response.getFiles()) { |
---|
[101] | 83 | putFile(f, id); |
---|
[97] | 84 | } |
---|
| 85 | } |
---|
| 86 | StringBuilder statusLine = new StringBuilder(); |
---|
[101] | 87 | statusLine.append(response.getOk() ? "ok" : "error").append(idToString(id)); |
---|
[97] | 88 | if (Strings.notEmpty(response.getComment())) { |
---|
[99] | 89 | Request.quoteValue(statusLine.append(" "), response.getComment()); |
---|
[97] | 90 | } |
---|
| 91 | putLine(statusLine.toString()); |
---|
| 92 | flushOut(); |
---|
| 93 | } |
---|
| 94 | }); |
---|
| 95 | |
---|
| 96 | } |
---|
| 97 | |
---|
| 98 | |
---|
[102] | 99 | protected void processNextInputBatch() { |
---|
[97] | 100 | final Holder<Integer> id = new Holder<>(); |
---|
| 101 | final String line = getLine(); |
---|
| 102 | try { |
---|
| 103 | Pair<CharSequence, CharSequence> command = Request.takeIdentifier(line); |
---|
| 104 | final Pair<Integer, CharSequence> rest = takeRequestId(command.second); |
---|
| 105 | id.set(rest.first); |
---|
| 106 | |
---|
| 107 | final Request request = Request.parse(command.first, rest.second); |
---|
| 108 | |
---|
| 109 | if (log.isTraceEnabled()) { |
---|
[100] | 110 | log.trace("read request: {}", request); |
---|
[97] | 111 | } |
---|
| 112 | |
---|
| 113 | handleRequest(request, new ServerSideResponseFuture() { |
---|
| 114 | @Override |
---|
| 115 | protected void result(Response response) { |
---|
| 116 | respond(response, rest.first); |
---|
| 117 | } |
---|
| 118 | }); |
---|
| 119 | } catch (FramsticksException e) { |
---|
| 120 | e.arg("id", id.get()).arg("line", line); |
---|
| 121 | log.error("error: ", e); |
---|
[105] | 122 | respond(new Response(false, "invalid input: " + e.getMsg()), id.get()); |
---|
[97] | 123 | return; |
---|
| 124 | } |
---|
| 125 | |
---|
| 126 | } |
---|
| 127 | } |
---|