[77] | 1 | package com.framsticks.communication; |
---|
| 2 | |
---|
| 3 | import com.framsticks.communication.queries.ApplicationRequest; |
---|
| 4 | import com.framsticks.communication.queries.RegistrationRequest; |
---|
| 5 | import com.framsticks.communication.queries.UseRequest; |
---|
| 6 | import com.framsticks.communication.queries.VersionRequest; |
---|
| 7 | import com.framsticks.communication.util.LoggingStateCallback; |
---|
| 8 | import com.framsticks.params.ListSource; |
---|
| 9 | import com.framsticks.util.*; |
---|
| 10 | import org.apache.log4j.Logger; |
---|
| 11 | |
---|
| 12 | import java.io.BufferedReader; |
---|
| 13 | import java.io.IOException; |
---|
| 14 | import java.io.InputStreamReader; |
---|
| 15 | import java.io.PrintWriter; |
---|
| 16 | import java.net.Socket; |
---|
| 17 | import java.net.SocketException; |
---|
| 18 | import java.util.*; |
---|
| 19 | import java.util.regex.Matcher; |
---|
| 20 | import java.util.regex.Pattern; |
---|
| 21 | |
---|
| 22 | /** |
---|
| 23 | * @author Piotr Sniegowski |
---|
| 24 | */ |
---|
| 25 | public class ClientConnection extends Connection { |
---|
| 26 | |
---|
| 27 | private final static Logger LOGGER = Logger.getLogger(ClientConnection.class); |
---|
| 28 | |
---|
| 29 | protected final Map<String, Subscription> subscriptions = new HashMap<String, Subscription>(); |
---|
| 30 | |
---|
| 31 | public String getAddress() { |
---|
| 32 | return address; |
---|
| 33 | } |
---|
| 34 | |
---|
| 35 | public void connect(StateFunctor connectedFunctor) { |
---|
| 36 | try { |
---|
| 37 | LOGGER.info("connecting to " + address); |
---|
| 38 | |
---|
| 39 | socket = new Socket(hostName, port); |
---|
| 40 | |
---|
| 41 | socket.setSoTimeout(500); |
---|
| 42 | |
---|
| 43 | LOGGER.info("connected to " + hostName + ":" + port); |
---|
| 44 | connected = true; |
---|
| 45 | |
---|
| 46 | runThreads(); |
---|
| 47 | |
---|
| 48 | connectedFunctor.call(null); |
---|
| 49 | } catch (SocketException e) { |
---|
| 50 | LOGGER.error("failed to connect: " + e); |
---|
| 51 | connectedFunctor.call(e); |
---|
| 52 | } catch (IOException e) { |
---|
| 53 | LOGGER.error("buffer creation failure"); |
---|
| 54 | connectedFunctor.call(e); |
---|
| 55 | close(); |
---|
| 56 | } |
---|
| 57 | } |
---|
| 58 | |
---|
| 59 | private static abstract class InboundMessage { |
---|
| 60 | protected String currentFilePath; |
---|
| 61 | protected List<String> currentFileContent; |
---|
| 62 | protected final List<File> files = new ArrayList<File>(); |
---|
| 63 | |
---|
| 64 | public abstract void eof(); |
---|
| 65 | |
---|
| 66 | protected void initCurrentFile(String path) { |
---|
| 67 | currentFileContent = new LinkedList<String>(); |
---|
| 68 | currentFilePath = path; |
---|
| 69 | } |
---|
| 70 | protected void finishCurrentFile() { |
---|
| 71 | if (currentFileContent == null) { |
---|
| 72 | return; |
---|
| 73 | } |
---|
| 74 | files.add(new File(currentFilePath, new ListSource(currentFileContent))); |
---|
| 75 | currentFilePath = null; |
---|
| 76 | currentFileContent= null; |
---|
| 77 | } |
---|
| 78 | |
---|
| 79 | public abstract void startFile(String path); |
---|
| 80 | |
---|
| 81 | public void addLine(String line) { |
---|
| 82 | assert line != null; |
---|
| 83 | assert currentFileContent != null; |
---|
| 84 | currentFileContent.add(line.substring(0, line.length() - 1)); |
---|
| 85 | } |
---|
| 86 | |
---|
| 87 | public List<File> getFiles() { |
---|
| 88 | return files; |
---|
| 89 | } |
---|
| 90 | } |
---|
| 91 | |
---|
| 92 | private static class EventFire extends InboundMessage { |
---|
| 93 | public final Subscription subscription; |
---|
| 94 | |
---|
| 95 | private EventFire(Subscription subscription) { |
---|
| 96 | this.subscription = subscription; |
---|
| 97 | } |
---|
| 98 | |
---|
| 99 | public void startFile(String path) { |
---|
| 100 | assert path == null; |
---|
| 101 | initCurrentFile(null); |
---|
| 102 | } |
---|
| 103 | |
---|
| 104 | @Override |
---|
| 105 | public void eof() { |
---|
| 106 | finishCurrentFile(); |
---|
| 107 | Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() { |
---|
| 108 | @Override |
---|
| 109 | public void run() { |
---|
| 110 | subscription.getEventCallback().call(getFiles()); |
---|
| 111 | } |
---|
| 112 | }); |
---|
| 113 | } |
---|
| 114 | } |
---|
| 115 | |
---|
| 116 | private static class SentQuery extends InboundMessage { |
---|
| 117 | Request request; |
---|
| 118 | ResponseCallback callback; |
---|
| 119 | Dispatcher dispatcher; |
---|
| 120 | |
---|
| 121 | public void startFile(String path) { |
---|
| 122 | finishCurrentFile(); |
---|
| 123 | if (path == null) { |
---|
| 124 | assert request instanceof ApplicationRequest; |
---|
| 125 | path = ((ApplicationRequest)request).getPath(); |
---|
| 126 | } |
---|
| 127 | initCurrentFile(path); |
---|
| 128 | } |
---|
| 129 | |
---|
| 130 | public void eof() { |
---|
| 131 | finishCurrentFile(); |
---|
| 132 | //no-operation |
---|
| 133 | } |
---|
| 134 | |
---|
| 135 | @Override |
---|
| 136 | public String toString() { |
---|
| 137 | return request.toString(); |
---|
| 138 | } |
---|
| 139 | } |
---|
| 140 | private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>(); |
---|
| 141 | |
---|
| 142 | |
---|
| 143 | protected final String address; |
---|
| 144 | protected final String hostName; |
---|
| 145 | protected final int port; |
---|
| 146 | |
---|
| 147 | private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$"); |
---|
| 148 | |
---|
| 149 | public ClientConnection(String address) { |
---|
| 150 | assert address != null; |
---|
| 151 | this.address = address; |
---|
| 152 | Matcher matcher = addressPattern.matcher(address); |
---|
| 153 | if (!matcher.matches()) { |
---|
| 154 | LOGGER.fatal("invalid address: " + address); |
---|
| 155 | hostName = null; |
---|
| 156 | port = 0; |
---|
| 157 | return; |
---|
| 158 | } |
---|
| 159 | hostName = matcher.group(1); |
---|
| 160 | port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009; |
---|
| 161 | } |
---|
| 162 | |
---|
| 163 | private SentQuery currentlySentQuery; |
---|
| 164 | |
---|
| 165 | public void send(Request request, ResponseCallback callback) { |
---|
| 166 | send(request, AtOnceDispatcher.instance, callback); |
---|
| 167 | } |
---|
| 168 | |
---|
| 169 | public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) { |
---|
| 170 | |
---|
| 171 | if (!isConnected()) { |
---|
| 172 | LOGGER.fatal("not connected"); |
---|
| 173 | return; |
---|
| 174 | } |
---|
| 175 | final SentQuery sentQuery = new SentQuery(); |
---|
| 176 | sentQuery.request = request; |
---|
| 177 | sentQuery.callback = callback; |
---|
| 178 | sentQuery.dispatcher = dispatcher; |
---|
| 179 | |
---|
| 180 | senderThread.invokeLater(new Runnable(){ |
---|
| 181 | @Override |
---|
| 182 | public void run() { |
---|
| 183 | synchronized (ClientConnection.this) { |
---|
| 184 | |
---|
| 185 | while (!(requestIdEnabled || currentlySentQuery == null)) { |
---|
| 186 | try { |
---|
| 187 | ClientConnection.this.wait(); |
---|
| 188 | } catch (InterruptedException ignored) { |
---|
| 189 | break; |
---|
| 190 | } |
---|
| 191 | } |
---|
| 192 | } |
---|
| 193 | Integer id = stashQuery(sentQuery); |
---|
| 194 | String command = sentQuery.request.getCommand(); |
---|
| 195 | StringBuilder message = new StringBuilder(); |
---|
| 196 | message.append(command); |
---|
| 197 | if (id != null) { |
---|
| 198 | message.append(" ").append(id); |
---|
| 199 | } |
---|
| 200 | sentQuery.request.construct(message); |
---|
| 201 | String out = message.toString(); |
---|
| 202 | |
---|
| 203 | output.println(out); |
---|
| 204 | LOGGER.debug("sending query: " + out); |
---|
| 205 | |
---|
| 206 | } |
---|
| 207 | }); |
---|
| 208 | /* |
---|
| 209 | synchronized (this) { |
---|
| 210 | LOGGER.debug("queueing query: " + query); |
---|
| 211 | queryQueue.offer(sentQuery); |
---|
| 212 | notifyAll(); |
---|
| 213 | } |
---|
| 214 | */ |
---|
| 215 | } |
---|
| 216 | |
---|
| 217 | |
---|
| 218 | @Override |
---|
| 219 | public String toString() { |
---|
| 220 | return address; |
---|
| 221 | } |
---|
| 222 | |
---|
| 223 | public void subscribe(final String path, final SubscriptionCallback callback) { |
---|
| 224 | send(new RegistrationRequest().setPath(path), new ResponseCallback() { |
---|
| 225 | @Override |
---|
| 226 | public void process(Response response) { |
---|
| 227 | if (!response.getOk()) { |
---|
| 228 | LOGGER.error("failed to register on event: " + path); |
---|
| 229 | callback.subscribed(null); |
---|
| 230 | return; |
---|
| 231 | } |
---|
| 232 | assert response.getFiles().isEmpty(); |
---|
| 233 | Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment()); |
---|
| 234 | LOGGER.debug("registered on event: " + subscription); |
---|
| 235 | synchronized (subscriptions) { |
---|
| 236 | subscriptions.put(subscription.getRegisteredPath(), subscription); |
---|
| 237 | } |
---|
| 238 | subscription.setEventCallback(callback.subscribed(subscription)); |
---|
| 239 | if (subscription.getEventCallback() == null) { |
---|
| 240 | LOGGER.info("subscription for " + path + " aborted"); |
---|
| 241 | subscription.unsubscribe(new LoggingStateCallback(LOGGER, "abort subscription")); |
---|
| 242 | } |
---|
| 243 | } |
---|
| 244 | }); |
---|
| 245 | } |
---|
| 246 | |
---|
| 247 | public void negotiateProtocolVersion(StateFunctor stateFunctor) { |
---|
| 248 | protocolVersion = -1; |
---|
| 249 | sendQueryVersion(1, stateFunctor); |
---|
| 250 | } |
---|
| 251 | |
---|
| 252 | public void sendQueryVersion(final int version, final StateFunctor stateFunctor) { |
---|
| 253 | send(new VersionRequest().version(version), new StateCallback() { |
---|
| 254 | @Override |
---|
| 255 | public void call(Exception e) { |
---|
| 256 | if (e != null) { |
---|
| 257 | LOGGER.fatal("failed to upgrade protocol to version: " + version); |
---|
| 258 | return; |
---|
| 259 | } |
---|
| 260 | protocolVersion = version; |
---|
| 261 | if (version < 4) { |
---|
| 262 | /** it is an implicit loop here*/ |
---|
| 263 | sendQueryVersion(version + 1, stateFunctor); |
---|
| 264 | return; |
---|
| 265 | } |
---|
| 266 | send(new UseRequest().feature("request_id"), new StateCallback() { |
---|
| 267 | @Override |
---|
| 268 | public void call(Exception e) { |
---|
| 269 | requestIdEnabled = e == null; |
---|
| 270 | /* |
---|
| 271 | synchronized (ClientConnection.this) { |
---|
| 272 | ClientConnection.this.notifyAll(); |
---|
| 273 | } |
---|
| 274 | */ |
---|
| 275 | if (!requestIdEnabled) { |
---|
| 276 | LOGGER.fatal("protocol negotiation failed"); |
---|
| 277 | stateFunctor.call(new Exception("protocol negotiation failed", e)); |
---|
| 278 | return; |
---|
| 279 | } |
---|
| 280 | stateFunctor.call(null); |
---|
| 281 | } |
---|
| 282 | }); |
---|
| 283 | |
---|
| 284 | } |
---|
| 285 | }); |
---|
| 286 | } |
---|
| 287 | |
---|
| 288 | |
---|
| 289 | private synchronized SentQuery fetchQuery(Integer id, boolean remove) { |
---|
| 290 | if (id == null) { |
---|
| 291 | if (requestIdEnabled) { |
---|
| 292 | return null; |
---|
| 293 | } |
---|
| 294 | SentQuery result = currentlySentQuery; |
---|
| 295 | if (remove) { |
---|
| 296 | currentlySentQuery = null; |
---|
| 297 | notifyAll(); |
---|
| 298 | } |
---|
| 299 | return result; |
---|
| 300 | } |
---|
| 301 | if (queryMap.containsKey(id)) { |
---|
| 302 | SentQuery result = queryMap.get(id); |
---|
| 303 | if (remove) { |
---|
| 304 | queryMap.remove(id); |
---|
| 305 | } |
---|
| 306 | return result; |
---|
| 307 | } |
---|
| 308 | return null; |
---|
| 309 | } |
---|
| 310 | |
---|
| 311 | private int nextQueryId = 0; |
---|
| 312 | |
---|
| 313 | private Integer stashQuery(SentQuery sentQuery) { |
---|
| 314 | if (!requestIdEnabled) { |
---|
| 315 | currentlySentQuery = sentQuery; |
---|
| 316 | return null; |
---|
| 317 | } |
---|
| 318 | queryMap.put(nextQueryId, sentQuery); |
---|
| 319 | return nextQueryId++; |
---|
| 320 | } |
---|
| 321 | |
---|
| 322 | protected void processMessage(InboundMessage inboundMessage) throws Exception { |
---|
| 323 | if (inboundMessage == null) { |
---|
| 324 | LOGGER.error("failed to use any inbound message"); |
---|
| 325 | return; |
---|
| 326 | } |
---|
| 327 | |
---|
| 328 | String line; |
---|
| 329 | while (!(line = getLine()).startsWith("eof")) { |
---|
| 330 | inboundMessage.addLine(line); |
---|
| 331 | } |
---|
| 332 | inboundMessage.eof(); |
---|
| 333 | } |
---|
| 334 | |
---|
| 335 | protected void processEvent(String rest) throws Exception { |
---|
| 336 | Matcher matcher = eventPattern.matcher(rest); |
---|
| 337 | if (!matcher.matches()) { |
---|
| 338 | LOGGER.error("invalid event line: " + rest); |
---|
| 339 | return; |
---|
| 340 | } |
---|
| 341 | Subscription subscription = subscriptions.get(matcher.group(1)); |
---|
| 342 | if (subscription == null) { |
---|
| 343 | LOGGER.error("non subscribed event: " + matcher.group(1)); |
---|
| 344 | return; |
---|
| 345 | } |
---|
| 346 | EventFire event = new EventFire(subscription); |
---|
| 347 | event.startFile(null); |
---|
| 348 | processMessage(event); |
---|
| 349 | } |
---|
| 350 | |
---|
| 351 | |
---|
| 352 | protected void processMessageStartingWith(String line) throws Exception { |
---|
| 353 | Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n"); |
---|
| 354 | if (command.first.equals("event")) { |
---|
| 355 | processEvent(command.second); |
---|
| 356 | return; |
---|
| 357 | } |
---|
| 358 | Pair<Integer, String> rest = parseRest(command.second); |
---|
| 359 | |
---|
| 360 | if (command.first.equals("file")) { |
---|
| 361 | SentQuery sentQuery = fetchQuery(rest.first, false); |
---|
| 362 | sentQuery.startFile(rest.second); |
---|
| 363 | processMessage(sentQuery); |
---|
| 364 | return; |
---|
| 365 | } |
---|
| 366 | |
---|
| 367 | SentQuery sentQuery = fetchQuery(rest.first, true); |
---|
| 368 | if (sentQuery == null) { |
---|
| 369 | return; |
---|
| 370 | } |
---|
| 371 | LOGGER.debug("parsing response for request " + sentQuery); |
---|
| 372 | |
---|
| 373 | final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()); |
---|
| 374 | final ResponseCallback callback = sentQuery.callback; |
---|
| 375 | |
---|
| 376 | Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() { |
---|
| 377 | @Override |
---|
| 378 | public void run() { |
---|
| 379 | callback.process(response); |
---|
| 380 | } |
---|
| 381 | }); |
---|
| 382 | } |
---|
| 383 | |
---|
| 384 | @Override |
---|
| 385 | protected void receiverThreadRoutine() throws Exception { |
---|
| 386 | while (connected) { |
---|
| 387 | processMessageStartingWith(getLine()); |
---|
| 388 | } |
---|
| 389 | } |
---|
| 390 | |
---|
| 391 | } |
---|