Ignore:
Timestamp:
06/24/13 13:38:40 (11 years ago)
Author:
psniegowski
Message:

HIGHLIGHTS:

  • upgrade to Java 7
    • use try-multi-catch clauses
    • use try-with-resources were appropriate
  • configure FindBugs? (use mvn site and then navigate in browser to the report)
    • remove most bugs found
  • parametrize Dispatching environment (Dispatcher, RunAt?) to enforce more control on the place of closures actual call

CHANGELOG:
Rework FavouritesXMLFactory.

FindBugs?. Thread start.

FindBugs?. Minor change.

FindBugs?. Iterate over entrySet.

FindBugs?. Various.

FindBug?.

FindBug?. Encoding.

FindBug?. Final fields.

FindBug?.

Remove synchronization bug in ClientConnection?.

Experiments with findbugs.

Finish parametrization.

Make RunAt? an abstract class.

More changes in parametrization.

More changes in parametrizing dispatching.

Several changes to parametrize tasks.

Rename Runnable to RunAt?.

Add specific framsticks Runnable.

Add JSR305 (annotations).

Add findbugs reporting.

More improvements to ParamBuilder? wording.

Make FramsClass? accept also ParamBuilder?.

Change wording of ParamBuilder?.

Change wording of Request creation.

Use Java 7 exception catch syntax.

Add ScopeEnd? class.

Upgrade to Java 7.

Location:
java/main/src/main/java/com/framsticks/communication
Files:
11 edited

Legend:

Unmodified
Added
Removed
  • java/main/src/main/java/com/framsticks/communication/ClientConnection.java

    r84 r85  
    2121import java.util.regex.Matcher;
    2222import java.util.regex.Pattern;
     23import com.framsticks.util.dispatching.RunAt;
    2324
    2425/**
     
    2930        private final static Logger log = Logger.getLogger(ClientConnection.class);
    3031
    31         protected final Map<String, Subscription> subscriptions = new HashMap<String, Subscription>();
     32        protected final Map<String, Subscription<?>> subscriptions = new HashMap<>();
    3233
    3334        public String getAddress() {
     
    9394
    9495        private static class EventFire extends InboundMessage {
    95                 public final Subscription subscription;
    96 
    97                 private EventFire(Subscription subscription) {
     96                public final Subscription<?> subscription;
     97
     98                private EventFire(Subscription<?> subscription) {
    9899                        this.subscription = subscription;
    99100                }
     
    107108                public void eof() {
    108109                        finishCurrentFile();
    109                         Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() {
    110                                 @Override
    111                                 public void run() {
    112                                         subscription.getEventCallback().call(getFiles());
    113                                 }
    114                         });
    115                 }
    116         }
    117 
    118         private static class SentQuery extends InboundMessage {
     110
     111                        subscription.dispatchCall(getFiles());
     112                }
     113        }
     114
     115        private static class SentQuery<C> extends InboundMessage {
    119116                Request request;
    120                 ResponseCallback callback;
    121                 Dispatcher dispatcher;
     117                ResponseCallback<? extends C> callback;
     118                Dispatcher<C> dispatcher;
    122119
    123120                public void startFile(String path) {
     
    139136                        return request.toString();
    140137                }
    141         }
    142         private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>();
     138
     139                public void dispatchResponseProcess(final Response response) {
     140                        Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() {
     141                                @Override
     142                                public void run() {
     143                                        callback.process(response);
     144                                }
     145                        });
     146                }
     147        }
     148        private Map<Integer, SentQuery<?>> queryMap = new HashMap<>();
    143149
    144150
     
    163169        }
    164170
    165         private SentQuery currentlySentQuery;
    166 
    167         public void send(Request request, ResponseCallback callback) {
    168                 send(request, AtOnceDispatcher.instance, callback);
    169         }
    170 
    171         public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) {
     171        private SentQuery<?> currentlySentQuery;
     172
     173
     174        public <C extends Connection> void send(Request request, ResponseCallback<C> callback) {
     175                //TODO RunAt
     176                send(request, AtOnceDispatcher.getInstance(), callback);
     177        }
     178
     179        public <C> void send(Request request, Dispatcher<C> dispatcher, ResponseCallback<? extends C> callback) {
    172180
    173181                if (!isConnected()) {
     
    175183                        return;
    176184                }
    177                 final SentQuery sentQuery = new SentQuery();
     185                final SentQuery<C> sentQuery = new SentQuery<C>();
    178186                sentQuery.request = request;
    179187                sentQuery.callback = callback;
    180188                sentQuery.dispatcher = dispatcher;
    181189
    182                 senderThread.invokeLater(new Runnable(){
     190                senderThread.invokeLater(new RunAt<Connection>(){
    183191                        @Override
    184192                        public void run() {
     193                                Integer id;
    185194                                synchronized (ClientConnection.this) {
    186195
     
    192201                                                }
    193202                                        }
    194                                 }
    195                                 Integer id = stashQuery(sentQuery);
     203                                        if (requestIdEnabled) {
     204                                                queryMap.put(nextQueryId, sentQuery);
     205                                                id = nextQueryId++;
     206                                        } else {
     207                                                currentlySentQuery = sentQuery;
     208                                                id = null;
     209                                        }
     210                                }
    196211                                String command = sentQuery.request.getCommand();
    197212                                StringBuilder message = new StringBuilder();
     
    223238        }
    224239
    225         public void subscribe(final String path, final SubscriptionCallback callback) {
    226                 send(new RegistrationRequest().setPath(path), new ResponseCallback() {
     240        public <C> void subscribe(final String path, final Dispatcher<C> dispatcher, final SubscriptionCallback<? extends C> callback) {
     241                send(new RegistrationRequest().path(path), new ResponseCallback<Connection>() {
    227242                        @Override
    228243                        public void process(Response response) {
     
    233248                                }
    234249                                assert response.getFiles().isEmpty();
    235                                 Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment());
     250                                Subscription<C> subscription = new Subscription<C>(ClientConnection.this, path, response.getComment(), dispatcher);
    236251                                log.debug("registered on event: " + subscription);
    237252                                synchronized (subscriptions) {
     
    241256                                if (subscription.getEventCallback() == null) {
    242257                                        log.info("subscription for " + path + " aborted");
    243                                         subscription.unsubscribe(new LoggingStateCallback(log, "abort subscription"));
     258                                        subscription.unsubscribe(new LoggingStateCallback<C>(log, "abort subscription"));
    244259                                }
    245260                        }
     
    253268
    254269        public void sendQueryVersion(final int version, final StateFunctor stateFunctor) {
    255                 send(new VersionRequest().version(version), new StateCallback() {
     270                send(new VersionRequest().version(version), new StateCallback<Connection>() {
    256271                        @Override
    257272                        public void call(Exception e) {
     
    266281                                        return;
    267282                                }
    268                                 send(new UseRequest().feature("request_id"), new StateCallback() {
     283                                send(new UseRequest().feature("request_id"), new StateCallback<Connection>() {
    269284                                        @Override
    270285                                        public void call(Exception e) {
     
    289304
    290305
    291         private synchronized SentQuery fetchQuery(Integer id, boolean remove) {
     306        private synchronized SentQuery<?> fetchQuery(Integer id, boolean remove) {
    292307                if (id == null) {
    293308                        if (requestIdEnabled) {
    294309                                return null;
    295310                        }
    296                         SentQuery result = currentlySentQuery;
     311                        SentQuery<?> result = currentlySentQuery;
    297312                        if (remove) {
    298313                                currentlySentQuery = null;
     
    302317                }
    303318                if (queryMap.containsKey(id)) {
    304                         SentQuery result = queryMap.get(id);
     319                        SentQuery<?> result = queryMap.get(id);
    305320                        if (remove) {
    306321                                queryMap.remove(id);
     
    312327
    313328        private int nextQueryId = 0;
    314 
    315         private Integer stashQuery(SentQuery sentQuery) {
    316                 if (!requestIdEnabled) {
    317                         currentlySentQuery = sentQuery;
    318                         return null;
    319                 }
    320                 queryMap.put(nextQueryId, sentQuery);
    321                 return nextQueryId++;
    322         }
    323329
    324330        protected void processMessage(InboundMessage inboundMessage) throws Exception {
     
    342348                        return;
    343349                }
    344                 Subscription subscription = subscriptions.get(matcher.group(1));
     350                Subscription<?> subscription = subscriptions.get(matcher.group(1));
    345351                if (subscription == null) {
    346352                        log.error("non subscribed event: " + matcher.group(1));
     
    362368
    363369                if (command.first.equals("file")) {
    364                         SentQuery sentQuery = fetchQuery(rest.first, false);
     370                        SentQuery<?> sentQuery = fetchQuery(rest.first, false);
    365371                        sentQuery.startFile(rest.second);
    366372                        processMessage(sentQuery);
     
    368374                }
    369375
    370                 SentQuery sentQuery = fetchQuery(rest.first, true);
     376                SentQuery<?> sentQuery = fetchQuery(rest.first, true);
    371377                if (sentQuery == null) {
    372378                        return;
     
    374380                log.debug("parsing response for request " + sentQuery);
    375381
    376                 final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles());
    377                 final ResponseCallback callback = sentQuery.callback;
    378 
    379                 Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() {
    380                         @Override
    381                         public void run() {
    382                                 callback.process(response);
    383                         }
    384                 });
     382                sentQuery.dispatchResponseProcess(new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles()));
    385383        }
    386384
  • java/main/src/main/java/com/framsticks/communication/Connection.java

    r84 r85  
    11package com.framsticks.communication;
    22
     3import com.framsticks.util.io.Encoding;
    34import com.framsticks.util.lang.Pair;
    45import org.apache.log4j.Logger;
     
    67import java.io.IOException;
    78import java.io.InputStreamReader;
     9import java.io.OutputStreamWriter;
    810import java.io.PrintWriter;
    9 import java.lang.Thread;
    1011import java.net.Socket;
    1112import java.net.SocketTimeoutException;
    1213import java.util.regex.Matcher;
    1314import java.util.regex.Pattern;
     15
     16import com.framsticks.util.dispatching.RunAt;
     17import com.framsticks.util.dispatching.Thread;
    1418
    1519public abstract class Connection {
     
    2832        protected int protocolVersion = -1;
    2933
    30         protected final com.framsticks.util.dispatching.Thread senderThread = new com.framsticks.util.dispatching.Thread();
    31         protected Thread receiverThread;
     34        protected final Thread<Connection> senderThread = new Thread<>();
     35        protected final Thread<Connection> receiverThread = new Thread<>();
    3236
    3337        public boolean isConnected() {
     
    4246                        senderThread.interrupt();
    4347                        senderThread.join();
    44                         if (receiverThread != null) {
    45                                 receiverThread.interrupt();
    46                                 receiverThread.join();
    47                                 receiverThread = null;
    48                         }
     48
     49                        receiverThread.interrupt();
     50                        receiverThread.join();
    4951
    5052                        if (output != null) {
     
    7274
    7375        protected static final String ARGUMENT_PATTERN_FRAGMENT = "((?:\\S+)|(?:\"[^\"]*\"))";
    74         protected static Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$");
    75         protected static Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$");
    76         protected static Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n");
     76        protected static final Pattern requestIdEnabledPattern = Pattern.compile("^\\s*([0-9]+)(?:\\s+" + ARGUMENT_PATTERN_FRAGMENT + ")?\\n$");
     77        protected static final Pattern requestIDisabledPattern = Pattern.compile("^\\s*" + ARGUMENT_PATTERN_FRAGMENT + "?\\n$");
     78        protected static final Pattern eventPattern = Pattern.compile("^\\s*(\\S+)\\s*(\\S+)\\n");
    7779
    7880
     
    126128        protected void runThreads() {
    127129                try {
    128                         output = new PrintWriter(socket.getOutputStream(), true);
    129                         input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
     130                        output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true);
     131                        input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset()));
    130132                } catch (IOException e) {
    131133                        log.error("buffer creation failure");
     
    135137
    136138                senderThread.setName(this + "-sender");
    137                 receiverThread = new Thread(new Runnable() {
     139                receiverThread.setName(this + "-receiver");
     140
     141                senderThread.start();
     142                receiverThread.start();
     143
     144                receiverThread.invokeLater(new RunAt<Connection>() {
    138145                        @Override
    139146                        public void run() {
     
    148155                        }
    149156                });
    150                 receiverThread.setName(this + "-receiver");
    151157
    152                 senderThread.start();
    153                 receiverThread.start();
    154158        }
    155159
  • java/main/src/main/java/com/framsticks/communication/RequestHandler.java

    r77 r85  
    77 */
    88public interface RequestHandler {
    9     public void handle(ApplicationRequest request, ResponseCallback responseCallback);
     9        public void handle(ApplicationRequest request, ResponseCallback<?> responseCallback);
    1010}
  • java/main/src/main/java/com/framsticks/communication/ResponseCallback.java

    r84 r85  
    44 * @author Piotr Sniegowski
    55 */
    6 public interface ResponseCallback {
     6public interface ResponseCallback<C> {
    77        public void process(Response response);
    88}
  • java/main/src/main/java/com/framsticks/communication/ServerConnection.java

    r84 r85  
    88
    99import java.net.Socket;
     10import com.framsticks.util.dispatching.RunAt;
    1011
    1112/**
     
    2627
    2728        public void start() {
    28 
    29 
    3029                runThreads();
    3130        }
     
    4342        }
    4443
    45         protected void handleRequest(Request request, ResponseCallback responseCallback) {
     44        protected void handleRequest(Request request, ResponseCallback<?> responseCallback) {
    4645                if (request instanceof ApplicationRequest) {
    47                         requestHandler.handle((ApplicationRequest)request, responseCallback);
     46                        requestHandler.handle((ApplicationRequest) request, responseCallback);
    4847                        return;
    4948                }
     
    7069
    7170        protected final void respond(final Response response, final Integer id) {
    72                 senderThread.invokeLater(new Runnable() {
     71                senderThread.invokeLater(new RunAt<Connection>() {
    7372                        @Override
    7473                        public void run() {
     
    118117                }
    119118
    120                 handleRequest(request, new ResponseCallback() {
     119                handleRequest(request, new ResponseCallback<ServerConnection>() {
    121120                        @Override
    122121                        public void process(Response response) {
  • java/main/src/main/java/com/framsticks/communication/StateCallback.java

    r77 r85  
    66 * @author Piotr Sniegowski
    77 */
    8 public abstract class StateCallback implements ResponseCallback, StateFunctor {
     8public abstract class StateCallback<C> implements ResponseCallback<C>, StateFunctor {
    99        @Override
    1010        public void process(Response response) {
  • java/main/src/main/java/com/framsticks/communication/Subscription.java

    r84 r85  
    11package com.framsticks.communication;
    22
     3import java.util.List;
     4
    35import com.framsticks.communication.queries.RegistrationRequest;
    4 import com.framsticks.util.dispatching.AtOnceDispatcher;
    56import com.framsticks.util.dispatching.Dispatcher;
     7import com.framsticks.util.dispatching.Dispatching;
     8import com.framsticks.util.dispatching.RunAt;
    69import com.framsticks.util.StateFunctor;
    710import org.apache.log4j.Logger;
     
    1114 * @author Piotr Sniegowski
    1215 */
    13 public class Subscription {
     16public class Subscription<C> {
    1417
    15     private final static Logger log = Logger.getLogger(Subscription.class);
     18        private final static Logger log = Logger.getLogger(Subscription.class);
    1619
    17     private final ClientConnection connection;
    18     private final String path;
     20        private final ClientConnection connection;
     21        private final String path;
    1922        private final String registeredPath;
    20     private Dispatcher dispatcher = AtOnceDispatcher.instance;
     23        private final Dispatcher<C> dispatcher;
    2124
    2225        private EventCallback eventCallback;
    2326
    24         public Subscription(ClientConnection connection, String path, String registeredPath) {
     27        public Subscription(ClientConnection connection, String path, String registeredPath, Dispatcher<C> dispatcher) {
    2528                this.connection = connection;
    26         this.path = path;
     29                this.path = path;
    2730                this.registeredPath = registeredPath;
     31                this.dispatcher = dispatcher;
    2832        }
    2933
     
    4145        }
    4246
    43     public void setDispatcher(Dispatcher dispatcher) {
    44         this.dispatcher = dispatcher;
    45     }
     47        public void unsubscribe(final StateFunctor stateFunctor) {
     48                //@todo remove that /cli/ prefix, when registeredPath will be a fully qualified path
     49                connection.send(new RegistrationRequest().register(false).path(registeredPath), new ResponseCallback<Connection>() {
     50                        @Override
     51                        public void process(Response response) {
     52                                if (!response.getOk()) {
     53                                        log.error("failed to unsunscribe " + this + ": " + response.getComment());
     54                                        stateFunctor.call(new Exception(response.getComment()));
     55                                        return;
     56                                }
     57                                assert response.hasFiles();
     58                                log.debug("unsunscribed " + this);
     59                                stateFunctor.call(null);
     60                        }
     61                });
     62        }
    4663
    47     public void unsubscribe(final StateFunctor stateFunctor) {
    48         //@todo remove that /cli/ prefix, when registeredPath will be a fully qualified path
    49         connection.send(new RegistrationRequest().register(false).setPath(registeredPath), new ResponseCallback() {
    50             @Override
    51             public void process(Response response) {
    52                 if (!response.getOk()) {
    53                     log.error("failed to unsunscribe " + this + ": " + response.getComment());
    54                     stateFunctor.call(new Exception(response.getComment()));
    55                     return;
    56                 }
    57                 assert response.hasFiles();
    58                 log.debug("unsunscribed " + this);
    59                 stateFunctor.call(null);
    60             }
    61         });
    62     }
     64        public EventCallback getEventCallback() {
     65                return eventCallback;
     66        }
    6367
    64     public EventCallback getEventCallback() {
    65         return eventCallback;
    66     }
     68        public Dispatcher<C> getDispatcher() {
     69                return dispatcher;
     70        }
    6771
    68     public Dispatcher getDispatcher() {
    69         return dispatcher;
    70     }
     72        public void setEventCallback(EventCallback eventCallback) {
     73                this.eventCallback = eventCallback;
     74        }
    7175
    72     public void setEventCallback(EventCallback eventCallback) {
    73         this.eventCallback = eventCallback;
    74     }
     76        public void dispatchCall(final List<File> files) {
     77                Dispatching.invokeLaterOrNow(dispatcher, new RunAt<C>() {
     78                        @Override
     79                        public void run() {
     80                                eventCallback.call(files);
     81                        }
     82                });
     83        }
    7584}
  • java/main/src/main/java/com/framsticks/communication/SubscriptionCallback.java

    r77 r85  
    44 * @author Piotr Sniegowski
    55 */
    6 public interface SubscriptionCallback {
    7         EventCallback subscribed(Subscription subscription);
     6public interface SubscriptionCallback<C> {
     7        EventCallback subscribed(Subscription<? super C> subscription);
    88}
  • java/main/src/main/java/com/framsticks/communication/queries/ApplicationRequest.java

    r84 r85  
    1414        protected String fields;
    1515
    16         public ApplicationRequest setPath(String path) {
     16        public ApplicationRequest path(String path) {
    1717                assert path != null;
    1818                this.path = path;
     
    2020        }
    2121
    22         public ApplicationRequest setField(String field) {
     22        public ApplicationRequest field(String field) {
    2323                this.fields = field;
    2424                return this;
    2525        }
    2626
    27         public ApplicationRequest setFields(Collection<String> fields) {
     27        public ApplicationRequest fields(Collection<String> fields) {
    2828                Delimeted d = new Delimeted(",", "");
    2929                for (String f : fields) {
    3030                        d.append(f);
    3131                }
    32                 return setField(d.build());
     32                return field(d.build());
    3333        }
    3434
  • java/main/src/main/java/com/framsticks/communication/util/LoggingStateCallback.java

    r84 r85  
    77 * @author Piotr Sniegowski
    88 */
    9 public class LoggingStateCallback extends StateCallback {
     9public class LoggingStateCallback<C> extends StateCallback<C> {
    1010
    1111        protected final Logger logger;
     
    2323                        return;
    2424                }
    25         logger.debug(message);
     25                logger.debug(message);
    2626        }
    2727}
  • java/main/src/main/java/com/framsticks/communication/util/LoggingSubscriptionCallback.java

    r77 r85  
    99 * @author Piotr Sniegowski
    1010 */
    11 public class LoggingSubscriptionCallback implements SubscriptionCallback {
     11public class LoggingSubscriptionCallback<C> implements SubscriptionCallback<C> {
    1212
    13     protected final Logger logger;
    14     protected final String message;
    15     private final EventCallback eventCallback;
     13        protected final Logger logger;
     14        protected final String message;
     15        private final EventCallback eventCallback;
    1616
    17     public LoggingSubscriptionCallback(Logger logger, String message, EventCallback eventCallback) {
    18         this.logger = logger;
    19         this.message = message;
    20         this.eventCallback = eventCallback;
    21     }
     17        public LoggingSubscriptionCallback(Logger logger, String message, EventCallback eventCallback) {
     18                this.logger = logger;
     19                this.message = message;
     20                this.eventCallback = eventCallback;
     21        }
    2222
    23     @Override
    24     public EventCallback subscribed(Subscription subscription) {
    25         if (subscription == null) {
    26             logger.error("failed to subscribe for " + message);
    27             return null;
    28         }
    29         logger.info("successfuly subscribed for " + message);
    30         return eventCallback;
    31     }
     23        @Override
     24        public EventCallback subscribed(Subscription<? super C> subscription) {
     25                if (subscription == null) {
     26                        logger.error("failed to subscribe for " + message);
     27                        return null;
     28                }
     29                logger.info("successfuly subscribed for " + message);
     30                return eventCallback;
     31        }
    3232}
Note: See TracChangeset for help on using the changeset viewer.