package com.framsticks.running; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import com.framsticks.params.EventListener; import com.framsticks.params.EventListeners; import com.framsticks.params.ParamFlags; import com.framsticks.params.annotations.AutoAppendAnnotation; import com.framsticks.params.annotations.FramsClassAnnotation; import com.framsticks.params.annotations.ParamAnnotation; import com.framsticks.structure.messages.ValueChange; import com.framsticks.util.FramsticksException; import com.framsticks.util.dispatching.AbstractJoinable; import com.framsticks.util.dispatching.Dispatching; import com.framsticks.util.dispatching.Joinable; import com.framsticks.util.dispatching.JoinableParent; import com.framsticks.util.dispatching.JoinableState; import com.framsticks.util.dispatching.RunAt; import com.framsticks.util.dispatching.Thread; import com.framsticks.util.dispatching.ThrowExceptionHandler; import com.framsticks.util.io.Encoding; import com.framsticks.util.lang.Strings; @FramsClassAnnotation public class ExternalProcess extends AbstractJoinable implements JoinableParent { private static final Logger log = LogManager.getLogger(ExternalProcess.class); protected List arguments = new ArrayList<>(); protected Process process; protected Thread readerThread = new Thread(); protected PrintWriter input; protected BufferedReader output; protected Integer exitCode; protected String echoInput; protected String directory; protected String host; protected final EventListeners listeners = new EventListeners<>(); @AutoAppendAnnotation @ParamAnnotation(id = "line_output") public void addOutputListener(EventListener listener) { synchronized (listeners) { listeners.add(listener); } } @ParamAnnotation(id = "line_output") public void removeOutputListener(EventListener listener) { synchronized (listeners) { listeners.remove(listener); } } /** * */ public ExternalProcess() { super(); setName("process"); arguments.add(null); } /** * @return the command */ @ParamAnnotation(flags = ParamFlags.USERREADONLY) public String getCommand() { return arguments.get(0); } /** * @param command the command to set */ @ParamAnnotation public void setCommand(String command) { arguments.set(0, command); } protected void readerTask() { log.debug("reading output from " + this); String line; try { try { while ((line = output.readLine()) != null) { log.trace("read line: {}", line); synchronized (listeners) { listeners.actionForAll(new ValueChange(line)); } } } catch (IOException e) { throw new FramsticksException().msg("failed to read line from output of process").cause(e); } try { exitCode = process.waitFor(); } catch (InterruptedException e) { throw new FramsticksException().msg("failed to wait for process").cause(e); } log.info("process ended {}", this); // process = null; } catch (FramsticksException e) { log.error("exception caught in process {}", this, e); } interruptJoinable(); // finish(); } @ParamAnnotation(flags = ParamFlags.USERREADONLY) public void setDirectory(String directory) { this.directory = directory; } /** * @return the host */ public String getHost() { return host; } /** * @param host the host to set */ public void setHost(String host) { this.host = host; } @ParamAnnotation public String getDirectory() { return Strings.toStringNullProof(directory, "."); } @Override protected void joinableStart() { final ProcessBuilder builder = new ProcessBuilder(); builder.redirectErrorStream(true); if (host == null) { setDirectory(System.getProperties().get("user.home") + "/" + getDirectory()); setCommand(getDirectory() + "/" + getCommand()); builder.directory(new File(getDirectory())); } else { StringBuilder b = new StringBuilder(); setCommand("./" + getCommand()); for (String a : arguments) { b.append(" '").append(a).append("'"); } arguments = Arrays.asList("ssh", host, "-tt", ("cd " + getDirectory() + " &&" + b.toString())); } log.info("running process {}", this); builder.command(arguments); try { process = builder.start(); input = new PrintWriter(new OutputStreamWriter(process.getOutputStream(), Encoding.getDefaultCharset())); output = new BufferedReader(new InputStreamReader(process.getInputStream(), Encoding.getDefaultCharset())); } catch (IOException e) { throw new FramsticksException().msg("failed to start process").cause(e); } readerThread.dispatch(new RunAt(ThrowExceptionHandler.getInstance()) { @Override protected void runAt() { readerTask(); } }); Dispatching.use(readerThread, this); if (echoInput != null) { input.println(echoInput); input.flush(); } } @Override public String toString() { return getName() + arguments; } /** * @return the input */ public PrintWriter getInput() { return input; } /** * @return the echoInput */ @ParamAnnotation(flags = ParamFlags.USERREADONLY) public String getEchoInput() { return echoInput; } /** * @param echoInput the echoInput to set */ @ParamAnnotation public void setEchoInput(String echoInput) { this.echoInput = echoInput; } @Override protected void joinableInterrupt() { process.destroy(); Dispatching.drop(readerThread, this); // finish(); } @Override @ParamAnnotation(flags = ParamFlags.USERREADONLY) public String getName() { return readerThread.getName(); } /** * @param name the name to set */ @ParamAnnotation public void setName(String name) { readerThread.setName(name); } @Override protected void joinableFinish() { } @Override protected void joinableJoin() throws InterruptedException { Dispatching.join(readerThread); } @Override public void childChangedState(Joinable joinable, JoinableState state) { proceedToState(state); } }