Changeset 85 for java/main/src/main/java/com/framsticks/communication
- Timestamp:
- 06/24/13 13:38:40 (11 years ago)
- Location:
- java/main/src/main/java/com/framsticks/communication
- Files:
-
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
java/main/src/main/java/com/framsticks/communication/ClientConnection.java
r84 r85 21 21 import java.util.regex.Matcher; 22 22 import java.util.regex.Pattern; 23 import com.framsticks.util.dispatching.RunAt; 23 24 24 25 /** … … 29 30 private final static Logger log = Logger.getLogger(ClientConnection.class); 30 31 31 protected final Map<String, Subscription > subscriptions = new HashMap<String, Subscription>();32 protected final Map<String, Subscription<?>> subscriptions = new HashMap<>(); 32 33 33 34 public String getAddress() { … … 93 94 94 95 private static class EventFire extends InboundMessage { 95 public final Subscription subscription;96 97 private EventFire(Subscription subscription) {96 public final Subscription<?> subscription; 97 98 private EventFire(Subscription<?> subscription) { 98 99 this.subscription = subscription; 99 100 } … … 107 108 public void eof() { 108 109 finishCurrentFile(); 109 Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() { 110 @Override 111 public void run() { 112 subscription.getEventCallback().call(getFiles()); 113 } 114 }); 115 } 116 } 117 118 private static class SentQuery extends InboundMessage { 110 111 subscription.dispatchCall(getFiles()); 112 } 113 } 114 115 private static class SentQuery<C> extends InboundMessage { 119 116 Request request; 120 ResponseCallback callback;121 Dispatcher dispatcher;117 ResponseCallback<? extends C> callback; 118 Dispatcher<C> dispatcher; 122 119 123 120 public void startFile(String path) { … … 139 136 return request.toString(); 140 137 } 141 } 142 private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>(); 138 139 public void dispatchResponseProcess(final Response response) { 140 Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() { 141 @Override 142 public void run() { 143 callback.process(response); 144 } 145 }); 146 } 147 } 148 private Map<Integer, SentQuery<?>> queryMap = new HashMap<>(); 143 149 144 150 … … 163 169 } 164 170 165 private SentQuery currentlySentQuery; 166 167 public void send(Request request, ResponseCallback callback) { 168 send(request, AtOnceDispatcher.instance, callback); 169 } 170 171 public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) { 171 private SentQuery<?> currentlySentQuery; 172 173 174 public <C extends Connection> void send(Request request, ResponseCallback<C> callback) { 175 //TODO RunAt 176 send(request, AtOnceDispatcher.getInstance(), callback); 177 } 178 179 public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) { 172 180 173 181 if (!isConnected()) { … … 175 183 return; 176 184 } 177 final SentQuery sentQuery = new SentQuery();185 final SentQuery<C> sentQuery = new SentQuery<C>(); 178 186 sentQuery.request = request; 179 187 sentQuery.callback = callback; 180 188 sentQuery.dispatcher = dispatcher; 181 189 182 senderThread.invokeLater(new Run nable(){190 senderThread.invokeLater(new RunAt<Connection>(){ 183 191 @Override 184 192 public void run() { 193 Integer id; 185 194 synchronized (ClientConnection.this) { 186 195 … … 192 201 } 193 202 } 194 } 195 Integer id = stashQuery(sentQuery); 203 if (requestIdEnabled) { 204 queryMap.put(nextQueryId, sentQuery); 205 id = nextQueryId++; 206 } else { 207 currentlySentQuery = sentQuery; 208 id = null; 209 } 210 } 196 211 String command = sentQuery.request.getCommand(); 197 212 StringBuilder message = new StringBuilder(); … … 223 238 } 224 239 225 public void subscribe(final String path, final SubscriptionCallbackcallback) {226 send(new RegistrationRequest(). setPath(path), new ResponseCallback() {240 public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) { 241 send(new RegistrationRequest().path(path), new ResponseCallback<Connection>() { 227 242 @Override 228 243 public void process(Response response) { … … 233 248 } 234 249 assert response.getFiles().isEmpty(); 235 Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment());250 Subscription<C> subscription = new Subscription<C>(ClientConnection.this, path, response.getComment(), dispatcher); 236 251 log.debug("registered on event: " + subscription); 237 252 synchronized (subscriptions) { … … 241 256 if (subscription.getEventCallback() == null) { 242 257 log.info("subscription for " + path + " aborted"); 243 subscription.unsubscribe(new LoggingStateCallback (log, "abort subscription"));258 subscription.unsubscribe(new LoggingStateCallback<C>(log, "abort subscription")); 244 259 } 245 260 } … … 253 268 254 269 public void sendQueryVersion(final int version, final StateFunctor stateFunctor) { 255 send(new VersionRequest().version(version), new StateCallback () {270 send(new VersionRequest().version(version), new StateCallback<Connection>() { 256 271 @Override 257 272 public void call(Exception e) { … … 266 281 return; 267 282 } 268 send(new UseRequest().feature("request_id"), new StateCallback () {283 send(new UseRequest().feature("request_id"), new StateCallback<Connection>() { 269 284 @Override 270 285 public void call(Exception e) { … … 289 304 290 305 291 private synchronized SentQuery fetchQuery(Integer id, boolean remove) {306 private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) { 292 307 if (id == null) { 293 308 if (requestIdEnabled) { 294 309 return null; 295 310 } 296 SentQuery result = currentlySentQuery;311 SentQuery<?> result = currentlySentQuery; 297 312 if (remove) { 298 313 currentlySentQuery = null; … … 302 317 } 303 318 if (queryMap.containsKey(id)) { 304 SentQuery result = queryMap.get(id);319 SentQuery<?> result = queryMap.get(id); 305 320 if (remove) { 306 321 queryMap.remove(id); … … 312 327 313 328 private int nextQueryId = 0; 314 315 private Integer stashQuery(SentQuery sentQuery) {316 if (!requestIdEnabled) {317 currentlySentQuery = sentQuery;318 return null;319 }320 queryMap.put(nextQueryId, sentQuery);321 return nextQueryId++;322 }323 329 324 330 protected void processMessage(InboundMessage inboundMessage) throws Exception { … … 342 348 return; 343 349 } 344 Subscription subscription = subscriptions.get(matcher.group(1));350 Subscription<?> subscription = subscriptions.get(matcher.group(1)); 345 351 if (subscription == null) { 346 352 log.error("non subscribed event: " + matcher.group(1)); … … 362 368 363 369 if (command.first.equals("file")) { 364 SentQuery sentQuery = fetchQuery(rest.first, false);370 SentQuery<?> sentQuery = fetchQuery(rest.first, false); 365 371 sentQuery.startFile(rest.second); 366 372 processMessage(sentQuery); … … 368 374 } 369 375 370 SentQuery sentQuery = fetchQuery(rest.first, true);376 SentQuery<?> sentQuery = fetchQuery(rest.first, true); 371 377 if (sentQuery == null) { 372 378 return; … … 374 380 log.debug("parsing response for request " + sentQuery); 375 381 376 final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()); 377 final ResponseCallback callback = sentQuery.callback; 378 379 Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() { 380 @Override 381 public void run() { 382 callback.process(response); 383 } 384 }); 382 sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles())); 385 383 } 386 384 -
java/main/src/main/java/com/framsticks/communication/Connection.java
r84 r85 1 1 package com.framsticks.communication; 2 2 3 import com.framsticks.util.io.Encoding; 3 4 import com.framsticks.util.lang.Pair; 4 5 import org.apache.log4j.Logger; … … 6 7 import java.io.IOException; 7 8 import java.io.InputStreamReader; 9 import java.io.OutputStreamWriter; 8 10 import java.io.PrintWriter; 9 import java.lang.Thread;10 11 import java.net.Socket; 11 12 import java.net.SocketTimeoutException; 12 13 import java.util.regex.Matcher; 13 14 import java.util.regex.Pattern; 15 16 import com.framsticks.util.dispatching.RunAt; 17 import com.framsticks.util.dispatching.Thread; 14 18 15 19 public abstract class Connection { … … 28 32 protected int protocolVersion = -1; 29 33 30 protected final com.framsticks.util.dispatching.Thread senderThread = new com.framsticks.util.dispatching.Thread();31 protected Thread receiverThread;34 protected final Thread<Connection> senderThread = new Thread<>(); 35 protected final Thread<Connection> receiverThread = new Thread<>(); 32 36 33 37 public boolean isConnected() { … … 42 46 senderThread.interrupt(); 43 47 senderThread.join(); 44 if (receiverThread != null) { 45 receiverThread.interrupt(); 46 receiverThread.join(); 47 receiverThread = null; 48 } 48 49 receiverThread.interrupt(); 50 receiverThread.join(); 49 51 50 52 if (output != null) { … … 72 74 73 75 protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))"; 74 protected static Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$");75 protected static Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$");76 protected static Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n");76 protected static final Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$"); 77 protected static final Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$"); 78 protected static final Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n"); 77 79 78 80 … … 126 128 protected void runThreads() { 127 129 try { 128 output = new PrintWriter( socket.getOutputStream(), true);129 input = new BufferedReader(new InputStreamReader(socket.getInputStream() ));130 output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true); 131 input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset())); 130 132 } catch (IOException e) { 131 133 log.error("buffer creation failure"); … … 135 137 136 138 senderThread.setName(this + "-sender"); 137 receiverThread = new Thread(new Runnable() { 139 receiverThread.setName(this + "-receiver"); 140 141 senderThread.start(); 142 receiverThread.start(); 143 144 receiverThread.invokeLater(new RunAt<Connection>() { 138 145 @Override 139 146 public void run() { … … 148 155 } 149 156 }); 150 receiverThread.setName(this + "-receiver");151 157 152 senderThread.start();153 receiverThread.start();154 158 } 155 159 -
java/main/src/main/java/com/framsticks/communication/RequestHandler.java
r77 r85 7 7 */ 8 8 public interface RequestHandler { 9 public void handle(ApplicationRequest request, ResponseCallbackresponseCallback);9 public void handle(ApplicationRequest request, ResponseCallback<?> responseCallback); 10 10 } -
java/main/src/main/java/com/framsticks/communication/ResponseCallback.java
r84 r85 4 4 * @author Piotr Sniegowski 5 5 */ 6 public interface ResponseCallback {6 public interface ResponseCallback<C> { 7 7 public void process(Response response); 8 8 } -
java/main/src/main/java/com/framsticks/communication/ServerConnection.java
r84 r85 8 8 9 9 import java.net.Socket; 10 import com.framsticks.util.dispatching.RunAt; 10 11 11 12 /** … … 26 27 27 28 public void start() { 28 29 30 29 runThreads(); 31 30 } … … 43 42 } 44 43 45 protected void handleRequest(Request request, ResponseCallback responseCallback) {44 protected void handleRequest(Request request, ResponseCallback<?> responseCallback) { 46 45 if (request instanceof ApplicationRequest) { 47 requestHandler.handle((ApplicationRequest) request, responseCallback);46 requestHandler.handle((ApplicationRequest) request, responseCallback); 48 47 return; 49 48 } … … 70 69 71 70 protected final void respond(final Response response, final Integer id) { 72 senderThread.invokeLater(new Run nable() {71 senderThread.invokeLater(new RunAt<Connection>() { 73 72 @Override 74 73 public void run() { … … 118 117 } 119 118 120 handleRequest(request, new ResponseCallback () {119 handleRequest(request, new ResponseCallback<ServerConnection>() { 121 120 @Override 122 121 public void process(Response response) { -
java/main/src/main/java/com/framsticks/communication/StateCallback.java
r77 r85 6 6 * @author Piotr Sniegowski 7 7 */ 8 public abstract class StateCallback implements ResponseCallback, StateFunctor {8 public abstract class StateCallback<C> implements ResponseCallback<C>, StateFunctor { 9 9 @Override 10 10 public void process(Response response) { -
java/main/src/main/java/com/framsticks/communication/Subscription.java
r84 r85 1 1 package com.framsticks.communication; 2 2 3 import java.util.List; 4 3 5 import com.framsticks.communication.queries.RegistrationRequest; 4 import com.framsticks.util.dispatching.AtOnceDispatcher;5 6 import com.framsticks.util.dispatching.Dispatcher; 7 import com.framsticks.util.dispatching.Dispatching; 8 import com.framsticks.util.dispatching.RunAt; 6 9 import com.framsticks.util.StateFunctor; 7 10 import org.apache.log4j.Logger; … … 11 14 * @author Piotr Sniegowski 12 15 */ 13 public class Subscription {16 public class Subscription<C> { 14 17 15 18 private final static Logger log = Logger.getLogger(Subscription.class); 16 19 17 18 20 private final ClientConnection connection; 21 private final String path; 19 22 private final String registeredPath; 20 private Dispatcher dispatcher = AtOnceDispatcher.instance;23 private final Dispatcher<C> dispatcher; 21 24 22 25 private EventCallback eventCallback; 23 26 24 public Subscription(ClientConnection connection, String path, String registeredPath ) {27 public Subscription(ClientConnection connection, String path, String registeredPath, Dispatcher<C> dispatcher) { 25 28 this.connection = connection; 26 29 this.path = path; 27 30 this.registeredPath = registeredPath; 31 this.dispatcher = dispatcher; 28 32 } 29 33 … … 41 45 } 42 46 43 public void setDispatcher(Dispatcher dispatcher) { 44 this.dispatcher = dispatcher; 45 } 47 public void unsubscribe(final StateFunctor stateFunctor) { 48 //@todo remove that /cli/ prefix, when registeredPath will be a fully qualified path 49 connection.send(new RegistrationRequest().register(false).path(registeredPath), new ResponseCallback<Connection>() { 50 @Override 51 public void process(Response response) { 52 if (!response.getOk()) { 53 log.error("failed to unsunscribe " + this + ": " + response.getComment()); 54 stateFunctor.call(new Exception(response.getComment())); 55 return; 56 } 57 assert response.hasFiles(); 58 log.debug("unsunscribed " + this); 59 stateFunctor.call(null); 60 } 61 }); 62 } 46 63 47 public void unsubscribe(final StateFunctor stateFunctor) { 48 //@todo remove that /cli/ prefix, when registeredPath will be a fully qualified path 49 connection.send(new RegistrationRequest().register(false).setPath(registeredPath), new ResponseCallback() { 50 @Override 51 public void process(Response response) { 52 if (!response.getOk()) { 53 log.error("failed to unsunscribe " + this + ": " + response.getComment()); 54 stateFunctor.call(new Exception(response.getComment())); 55 return; 56 } 57 assert response.hasFiles(); 58 log.debug("unsunscribed " + this); 59 stateFunctor.call(null); 60 } 61 }); 62 } 64 public EventCallback getEventCallback() { 65 return eventCallback; 66 } 63 67 64 public EventCallback getEventCallback() {65 return eventCallback;66 68 public Dispatcher<C> getDispatcher() { 69 return dispatcher; 70 } 67 71 68 public Dispatcher getDispatcher() {69 return dispatcher;70 72 public void setEventCallback(EventCallback eventCallback) { 73 this.eventCallback = eventCallback; 74 } 71 75 72 public void setEventCallback(EventCallback eventCallback) { 73 this.eventCallback = eventCallback; 74 } 76 public void dispatchCall(final List<File> files) { 77 Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() { 78 @Override 79 public void run() { 80 eventCallback.call(files); 81 } 82 }); 83 } 75 84 } -
java/main/src/main/java/com/framsticks/communication/SubscriptionCallback.java
r77 r85 4 4 * @author Piotr Sniegowski 5 5 */ 6 public interface SubscriptionCallback {7 EventCallback subscribed(Subscription subscription);6 public interface SubscriptionCallback<C> { 7 EventCallback subscribed(Subscription<? super C> subscription); 8 8 } -
java/main/src/main/java/com/framsticks/communication/queries/ApplicationRequest.java
r84 r85 14 14 protected String fields; 15 15 16 public ApplicationRequest setPath(String path) {16 public ApplicationRequest path(String path) { 17 17 assert path != null; 18 18 this.path = path; … … 20 20 } 21 21 22 public ApplicationRequest setField(String field) {22 public ApplicationRequest field(String field) { 23 23 this.fields = field; 24 24 return this; 25 25 } 26 26 27 public ApplicationRequest setFields(Collection<String> fields) {27 public ApplicationRequest fields(Collection<String> fields) { 28 28 Delimeted d = new Delimeted(",", ""); 29 29 for (String f : fields) { 30 30 d.append(f); 31 31 } 32 return setField(d.build());32 return field(d.build()); 33 33 } 34 34 -
java/main/src/main/java/com/framsticks/communication/util/LoggingStateCallback.java
r84 r85 7 7 * @author Piotr Sniegowski 8 8 */ 9 public class LoggingStateCallback extends StateCallback{9 public class LoggingStateCallback<C> extends StateCallback<C> { 10 10 11 11 protected final Logger logger; … … 23 23 return; 24 24 } 25 25 logger.debug(message); 26 26 } 27 27 } -
java/main/src/main/java/com/framsticks/communication/util/LoggingSubscriptionCallback.java
r77 r85 9 9 * @author Piotr Sniegowski 10 10 */ 11 public class LoggingSubscriptionCallback implements SubscriptionCallback{11 public class LoggingSubscriptionCallback<C> implements SubscriptionCallback<C> { 12 12 13 14 15 13 protected final Logger logger; 14 protected final String message; 15 private final EventCallback eventCallback; 16 16 17 18 19 20 21 17 public LoggingSubscriptionCallback(Logger logger, String message, EventCallback eventCallback) { 18 this.logger = logger; 19 this.message = message; 20 this.eventCallback = eventCallback; 21 } 22 22 23 24 public EventCallback subscribed(Subscriptionsubscription) {25 26 27 28 29 30 31 23 @Override 24 public EventCallback subscribed(Subscription<? super C> subscription) { 25 if (subscription == null) { 26 logger.error("failed to subscribe for " + message); 27 return null; 28 } 29 logger.info("successfuly subscribed for " + message); 30 return eventCallback; 31 } 32 32 }
Note: See TracChangeset
for help on using the changeset viewer.