package com.framsticks.communication; import com.framsticks.params.Source; import com.framsticks.params.annotations.AutoAppendAnnotation; import com.framsticks.params.annotations.FramsClassAnnotation; import com.framsticks.params.annotations.ParamAnnotation; import com.framsticks.util.FramsticksException; import com.framsticks.util.io.Encoding; import com.framsticks.util.lang.Strings; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.util.Collection; import java.util.HashSet; import java.util.Set; import com.framsticks.util.dispatching.AbstractJoinable; import com.framsticks.util.dispatching.Dispatcher; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.dispatching.ExceptionResultHandler; import com.framsticks.util.dispatching.Joinable; import com.framsticks.util.dispatching.JoinableCollection; import com.framsticks.util.dispatching.JoinableParent; import com.framsticks.util.dispatching.JoinableState; import com.framsticks.util.dispatching.RunAt; import com.framsticks.util.dispatching.Thread; import com.framsticks.util.dispatching.ThrowExceptionHandler; @FramsClassAnnotation public abstract class Connection extends AbstractJoinable implements JoinableParent, ExceptionResultHandler { protected final static Logger log = LogManager.getLogger(Connection.class); private PrintWriter output = null; private BufferedReader input = null; protected Socket socket = null; protected Address address; protected String description = "connection"; protected final Thread senderThread = new Thread<>(); protected final Thread receiverThread = new Thread<>(); protected final JoinableCollection> threads = new JoinableCollection<>(); protected final Set listeners = new HashSet<>(); protected ExceptionResultHandler exceptionHandler = ThrowExceptionHandler.getInstance(); /** * */ public Connection() { threads.add(senderThread); threads.add(receiverThread); } protected void updateNames() { if (address == null) { return; } senderThread.setName(description + " thread " + address + " sender"); receiverThread.setName(description + " thread " + address + " receiver"); threads.setObservableName(address + " connection threads"); } public void setDescription(String description) { this.description = description; updateNames(); } @AutoAppendAnnotation public Connection setAddress(Address address) { this.address = address; updateNames(); return this; } @ParamAnnotation public Connection setAddress(String address) { return setAddress(new Address(address)); } static final int BUFFER_LENGTH = 1024; int readChars = 0; int iterator = 0; int bufferStart = 0; final char[] readBuffer = new char[BUFFER_LENGTH]; protected String getLine() { final StringBuilder lineBuffer = new StringBuilder(); try { while (!Thread.interrupted()) { while (iterator < readChars) { if (readBuffer[iterator] != '\n') { ++iterator; continue; } /** Do not append new line. */ lineBuffer.append(readBuffer, bufferStart, iterator - bufferStart); ++iterator; bufferStart = iterator; String line = lineBuffer.toString(); synchronized (listeners) { for (ConnectionListener l : listeners) { l.connectionIncomming(line); } } return line; } final int length = readChars - bufferStart; if (length > 0) { assert bufferStart >= 0 && bufferStart < BUFFER_LENGTH; assert bufferStart + length <= BUFFER_LENGTH; lineBuffer.append(readBuffer, bufferStart, length); } readChars = 0; readChars = input.read(readBuffer); if (readChars < 0) { throw new SocketClosedException().msg("socket is closed"); } iterator = 0; bufferStart = 0; } throw new InterruptedException(); } catch (Exception e) { log.debug("failed to read line (closing): {}", e.getMessage()); throw new SocketClosedException().msg("failed to read line").cause(e); } } protected void putLine(String line) { synchronized (listeners) { for (ConnectionListener l : listeners) { l.connectionOutgoing(line); } } output.println(line); } protected void flushOut() { output.flush(); } protected abstract void processNextInputBatch(); protected final void processInputBatchesUntilClosed() { while (isRunning() && !socket.isClosed()) { try { processNextInputBatch(); } catch (SocketClosedException e) { log.log(isRunning() ? Level.ERROR : Level.DEBUG, "socket is closing: {}", e.getShortMessage(new StringBuilder())); // log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e); break; } catch (FramsticksException e) { log.debug("{} caught exception in receiver thread {}", this, e.getMessage()); handle(e); } catch (Exception e) { log.log(isRunning() ? Level.ERROR : Level.DEBUG, "caught exception: ", e); break; } } log.debug("{} finished processing input", this); } protected abstract void receiverThreadRoutine(); // @SuppressWarnings("NN_NAKED_NOTIFY") protected void setupStreams() { try { output = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Encoding.getFramsticksCharset()), true); input = new BufferedReader(new InputStreamReader(socket.getInputStream(), Encoding.getFramsticksCharset())); synchronized (this) { this.notifyAll(); } } catch (IOException e) { throw new FramsticksException().msg("failed to setup streams").cause(e).arg("connection", this); } } @Override protected void joinableFinish() { try { if (output != null) { output.close(); output = null; } if (input != null) { input.close(); input = null; } if (socket != null) { socket.close(); socket = null; } } catch (Exception e) { log.error("failed to stop connection: ", e); } log.debug("connection closed"); } @Override public void childChangedState(Joinable joinable, JoinableState state) { proceedToState(state); } @Override public String getName() { return address != null ? description + " " + address : description; } @Override protected void joinableStart() { Dispatching.use(threads, this); senderThread.dispatch(new RunAt(this) { @Override protected void runAt() { synchronized (Connection.this) { while (state.equals(JoinableState.RUNNING) && output == null) { Dispatching.wait(Connection.this, 500); } } } }); receiverThread.dispatch(new RunAt(this) { @Override protected void runAt() { receiverThreadRoutine(); interruptJoinable(); // finishJoinable(); } }); } @Override protected void joinableInterrupt() { Dispatching.drop(threads, this); finishJoinable(); } @Override protected void joinableJoin() throws InterruptedException { Dispatching.join(threads); } protected static void startClientConnection(Connection connection) { while (connection.isRunning() && connection.socket == null) { log.debug("connecting to {}", connection.address); try { connection.socket = new Socket(connection.getAddressObject().getHostName(), connection.getAddressObject().getPort()); } catch (IOException e) { log.warn("{} failed to connect (retrying): {}", connection, e.getMessage()); Dispatching.sleep(0.5); } } log.debug("{} connected", connection); try { // connection.socket.setSoTimeout(500); connection.setupStreams(); } catch (Exception e) { throw new FramsticksException().msg("failed to initialize socket").cause(e).arg("connection", connection); } } /** * @return the address */ @ParamAnnotation public String getAddress() { return Strings.toStringNullProof(address, "?"); } public Address getAddressObject() { return address; } /** * @return the listeners */ public Collection getListeners() { return listeners; } /** * @return the handler */ public ExceptionResultHandler getExceptionHandler() { return exceptionHandler; } /** * @param handler the handler to set */ public void setExceptionHandler(ExceptionResultHandler handler) { this.exceptionHandler = handler; } public static T to(T connection, Address address) { connection.setAddress(address); return connection; } @Override public void handle(FramsticksException exception) { log.debug("{} handling {}", this, exception.getMessage()); exceptionHandler.handle(exception); } public Dispatcher getReceiverDispatcher() { return receiverThread; } public Dispatcher getSenderDispatcher() { return senderThread; } protected static String idToString(Integer id) { return id != null ? " " + id.toString() : ""; } protected final void putFile(File file, Integer outId) { putLine("file" + idToString(outId)/* + " " + f.getPath()*/); Source content = file.getContent(); String line; while ((line = content.readLine()) != null) { putLine(line); } putLine("eof"); } public final void sendFile(final String header, final File file, final Integer id, ExceptionResultHandler handler) { senderThread.dispatch(new RunAt(handler) { @Override protected void runAt() { if (header != null) { putLine(header); } putFile(file, id); flushOut(); } }); } public synchronized boolean isConnected() { return socket != null && socket.isConnected(); } }