source: java/main/src/main/java/com/framsticks/communication/ClientConnection.java @ 77

Last change on this file since 77 was 77, checked in by psniegowski, 11 years ago

Add new java codebase.

File size: 12.4 KB
Line 
1package com.framsticks.communication;
2
3import com.framsticks.communication.queries.ApplicationRequest;
4import com.framsticks.communication.queries.RegistrationRequest;
5import com.framsticks.communication.queries.UseRequest;
6import com.framsticks.communication.queries.VersionRequest;
7import com.framsticks.communication.util.LoggingStateCallback;
8import com.framsticks.params.ListSource;
9import com.framsticks.util.*;
10import org.apache.log4j.Logger;
11
12import java.io.BufferedReader;
13import java.io.IOException;
14import java.io.InputStreamReader;
15import java.io.PrintWriter;
16import java.net.Socket;
17import java.net.SocketException;
18import java.util.*;
19import java.util.regex.Matcher;
20import java.util.regex.Pattern;
21
22/**
23 * @author Piotr Sniegowski
24 */
25public class ClientConnection extends Connection {
26
27    private final static Logger LOGGER = Logger.getLogger(ClientConnection.class);
28
29    protected final Map<String, Subscription> subscriptions = new HashMap<String, Subscription>();
30
31    public String getAddress() {
32        return address;
33    }
34
35    public void connect(StateFunctor connectedFunctor) {
36        try {
37            LOGGER.info("connecting to " + address);
38
39            socket = new Socket(hostName, port);
40
41            socket.setSoTimeout(500);
42
43            LOGGER.info("connected to " + hostName + ":" + port);
44            connected = true;
45
46            runThreads();
47
48            connectedFunctor.call(null);
49        } catch (SocketException e) {
50            LOGGER.error("failed to connect: " + e);
51            connectedFunctor.call(e);
52        } catch (IOException e) {
53            LOGGER.error("buffer creation failure");
54            connectedFunctor.call(e);
55            close();
56        }
57    }
58
59    private static abstract class InboundMessage {
60        protected String currentFilePath;
61        protected List<String> currentFileContent;
62        protected final List<File> files = new ArrayList<File>();
63
64        public abstract void eof();
65
66        protected void initCurrentFile(String path) {
67            currentFileContent = new LinkedList<String>();
68            currentFilePath = path;
69        }
70        protected void finishCurrentFile() {
71            if (currentFileContent == null) {
72                return;
73            }
74            files.add(new File(currentFilePath, new ListSource(currentFileContent)));
75            currentFilePath = null;
76            currentFileContent= null;
77        }
78
79        public abstract void startFile(String path);
80
81        public void addLine(String line) {
82            assert line != null;
83            assert currentFileContent != null;
84            currentFileContent.add(line.substring(0, line.length() - 1));
85        }
86
87        public List<File> getFiles() {
88            return files;
89        }
90    }
91
92    private static class EventFire extends InboundMessage {
93        public final Subscription subscription;
94
95        private EventFire(Subscription subscription) {
96            this.subscription = subscription;
97        }
98
99        public void startFile(String path) {
100            assert path == null;
101            initCurrentFile(null);
102        }
103
104        @Override
105        public void eof() {
106            finishCurrentFile();
107            Dispatching.invokeLaterOrNow(subscription.getDispatcher(), new Runnable() {
108                @Override
109                public void run() {
110                    subscription.getEventCallback().call(getFiles());
111                }
112            });
113        }
114    }
115
116    private static class SentQuery extends InboundMessage {
117        Request request;
118        ResponseCallback callback;
119        Dispatcher dispatcher;
120
121        public void startFile(String path) {
122            finishCurrentFile();
123            if (path == null) {
124                assert request instanceof ApplicationRequest;
125                path = ((ApplicationRequest)request).getPath();
126            }
127            initCurrentFile(path);
128        }
129
130        public void eof() {
131            finishCurrentFile();
132            //no-operation
133        }
134
135        @Override
136        public String toString() {
137            return request.toString();
138        }
139    }
140    private Map<Integer, SentQuery> queryMap = new HashMap<Integer, SentQuery>();
141
142
143    protected final String address;
144    protected final String hostName;
145    protected final int port;
146
147    private static Pattern addressPattern = Pattern.compile("^([^:]*)(:([0-9]+))?$");
148
149    public ClientConnection(String address) {
150        assert address != null;
151        this.address = address;
152        Matcher matcher = addressPattern.matcher(address);
153        if (!matcher.matches()) {
154            LOGGER.fatal("invalid address: " + address);
155            hostName = null;
156            port = 0;
157            return;
158        }
159        hostName = matcher.group(1);
160        port = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 9009;
161    }
162
163    private SentQuery currentlySentQuery;
164
165    public void send(Request request, ResponseCallback callback) {
166        send(request, AtOnceDispatcher.instance, callback);
167    }
168
169    public void send(Request request, Dispatcher dispatcher, ResponseCallback callback) {
170
171        if (!isConnected()) {
172            LOGGER.fatal("not connected");
173            return;
174        }
175        final SentQuery sentQuery = new SentQuery();
176        sentQuery.request = request;
177        sentQuery.callback = callback;
178        sentQuery.dispatcher = dispatcher;
179
180        senderThread.invokeLater(new Runnable(){
181            @Override
182            public void run() {
183                synchronized (ClientConnection.this) {
184
185                    while (!(requestIdEnabled || currentlySentQuery == null)) {
186                        try {
187                            ClientConnection.this.wait();
188                        } catch (InterruptedException ignored) {
189                            break;
190                        }
191                    }
192                }
193                Integer id = stashQuery(sentQuery);
194                String command = sentQuery.request.getCommand();
195                StringBuilder message = new StringBuilder();
196                message.append(command);
197                if (id != null) {
198                    message.append(" ").append(id);
199                }
200                sentQuery.request.construct(message);
201                String out = message.toString();
202
203                output.println(out);
204                LOGGER.debug("sending query: " + out);
205
206            }
207        });
208        /*
209        synchronized (this) {
210            LOGGER.debug("queueing query: " + query);
211            queryQueue.offer(sentQuery);
212            notifyAll();
213        }
214        */
215    }
216
217
218    @Override
219    public String toString() {
220        return address;
221    }
222
223    public void subscribe(final String path, final SubscriptionCallback callback) {
224        send(new RegistrationRequest().setPath(path), new ResponseCallback() {
225            @Override
226            public void process(Response response) {
227                if (!response.getOk()) {
228                    LOGGER.error("failed to register on event: " + path);
229                    callback.subscribed(null);
230                    return;
231                }
232                assert response.getFiles().isEmpty();
233                Subscription subscription = new Subscription(ClientConnection.this, path, response.getComment());
234                LOGGER.debug("registered on event: " + subscription);
235                synchronized (subscriptions) {
236                    subscriptions.put(subscription.getRegisteredPath(), subscription);
237                }
238                subscription.setEventCallback(callback.subscribed(subscription));
239                if (subscription.getEventCallback() == null) {
240                    LOGGER.info("subscription for " + path + " aborted");
241                    subscription.unsubscribe(new LoggingStateCallback(LOGGER, "abort subscription"));
242                }
243            }
244        });
245    }
246
247    public void negotiateProtocolVersion(StateFunctor stateFunctor) {
248        protocolVersion = -1;
249        sendQueryVersion(1, stateFunctor);
250    }
251
252    public void sendQueryVersion(final int version, final StateFunctor stateFunctor) {
253        send(new VersionRequest().version(version), new StateCallback() {
254            @Override
255            public void call(Exception e) {
256                if (e != null) {
257                    LOGGER.fatal("failed to upgrade protocol to version: " + version);
258                    return;
259                }
260                protocolVersion = version;
261                if (version < 4) {
262                    /** it is an implicit loop here*/
263                    sendQueryVersion(version + 1, stateFunctor);
264                    return;
265                }
266                send(new UseRequest().feature("request_id"), new StateCallback() {
267                    @Override
268                    public void call(Exception e) {
269                        requestIdEnabled = e == null;
270                        /*
271                        synchronized (ClientConnection.this) {
272                            ClientConnection.this.notifyAll();
273                        }
274                        */
275                        if (!requestIdEnabled) {
276                            LOGGER.fatal("protocol negotiation failed");
277                            stateFunctor.call(new Exception("protocol negotiation failed", e));
278                            return;
279                        }
280                        stateFunctor.call(null);
281                    }
282                });
283
284            }
285        });
286    }
287
288
289    private synchronized SentQuery fetchQuery(Integer id, boolean remove) {
290        if (id == null) {
291            if (requestIdEnabled) {
292                return null;
293            }
294            SentQuery result = currentlySentQuery;
295            if (remove) {
296                currentlySentQuery = null;
297                notifyAll();
298            }
299            return result;
300        }
301        if (queryMap.containsKey(id)) {
302            SentQuery result = queryMap.get(id);
303            if (remove) {
304                queryMap.remove(id);
305            }
306            return result;
307        }
308        return null;
309    }
310
311    private int nextQueryId = 0;
312
313    private Integer stashQuery(SentQuery sentQuery) {
314        if (!requestIdEnabled) {
315            currentlySentQuery = sentQuery;
316            return null;
317        }
318        queryMap.put(nextQueryId, sentQuery);
319        return nextQueryId++;
320    }
321
322    protected void processMessage(InboundMessage inboundMessage) throws Exception {
323        if (inboundMessage == null) {
324            LOGGER.error("failed to use any inbound message");
325            return;
326        }
327
328        String line;
329        while (!(line = getLine()).startsWith("eof")) {
330            inboundMessage.addLine(line);
331        }
332        inboundMessage.eof();
333    }
334
335    protected void processEvent(String rest) throws Exception {
336        Matcher matcher = eventPattern.matcher(rest);
337        if (!matcher.matches()) {
338            LOGGER.error("invalid event line: " + rest);
339            return;
340        }
341        Subscription subscription = subscriptions.get(matcher.group(1));
342        if (subscription == null) {
343            LOGGER.error("non subscribed event: " + matcher.group(1));
344            return;
345        }
346        EventFire event = new EventFire(subscription);
347        event.startFile(null);
348        processMessage(event);
349    }
350
351
352    protected void processMessageStartingWith(String line) throws Exception {
353        Pair<String, String> command = Strings.splitIntoPair(line, ' ', "\n");
354        if (command.first.equals("event")) {
355            processEvent(command.second);
356            return;
357        }
358        Pair<Integer, String> rest = parseRest(command.second);
359
360        if (command.first.equals("file")) {
361            SentQuery sentQuery = fetchQuery(rest.first, false);
362            sentQuery.startFile(rest.second);
363            processMessage(sentQuery);
364            return;
365        }
366
367        SentQuery sentQuery = fetchQuery(rest.first, true);
368        if (sentQuery == null) {
369            return;
370        }
371        LOGGER.debug("parsing response for request " + sentQuery);
372
373        final Response response = new Response(command.first.equals("ok"), rest.second, sentQuery.getFiles());
374        final ResponseCallback callback = sentQuery.callback;
375
376        Dispatching.invokeLaterOrNow(sentQuery.dispatcher, new Runnable() {
377            @Override
378            public void run() {
379                callback.process(response);
380            }
381        });
382    }
383
384    @Override
385    protected void receiverThreadRoutine() throws Exception {
386        while (connected) {
387            processMessageStartingWith(getLine());
388        }
389    }
390
391}
Note: See TracBrowser for help on using the repository browser.