package com.framsticks.experiment; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.Map; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import com.framsticks.params.EventListener; import com.framsticks.params.MessageLogger; import com.framsticks.params.SimplePrimitive; import com.framsticks.params.annotations.FramsClassAnnotation; import com.framsticks.params.annotations.ParamAnnotation; import com.framsticks.structure.messages.Message; import com.framsticks.structure.messages.ValueChange; import com.framsticks.util.dispatching.FutureHandler; import com.framsticks.util.dispatching.Future; @FramsClassAnnotation(order = {"netLoadSaveLogic", "newestTaskScheduled", "newestResultReceived"}) public abstract class WorkPackageLogic> extends AbstractExperimentLogic { private static final Logger log = LogManager.getLogger(WorkPackageLogic.class); @ParamAnnotation public final NetLoadSaveLogic netLoadSaveLogic; protected final Class packageJavaClass; protected final MessageLogger messages = new MessageLogger(WorkPackageLogic.class); protected final Map sentPackages = new IdentityHashMap<>(); protected final LinkedList queuedPackages = new LinkedList<>(); protected final SimplePrimitive newestTaskSent = new SimplePrimitive<>(); protected final SimplePrimitive newestResultReceived = new SimplePrimitive<>(); /** * @param experiment */ public WorkPackageLogic(Experiment experiment, Class packageJavaClass) { super(experiment); this.packageJavaClass = packageJavaClass; netLoadSaveLogic = new NetLoadSaveLogic(experiment, packageJavaClass) { @Override public void netload(final Simulator simulator, final FutureHandler netFuture) { assert experiment.isActive(); log.debug("providing netload file for {}", simulator); findNextPackage(new Future (netFuture) { @Override protected void result(WP net) { assert experiment.isActive(); if (net == null) { log.debug("no more packages left"); return; } WorkPackageLogic.this.messages.info("netload", net.sumUpTask()); log.debug("sending package: {}", net); newestTaskSent.set(net.sumUpTask()); sentPackages.put(simulator, net); netFuture.pass(net); } }); } @Override public void netsave(Simulator simulator, WP netResult) { assert experiment.isActive(); log.info("received package from {}: {}", simulator, netResult); WorkPackageLogic.this.messages.info("netsave", netResult.toString()); WP netSent = sentPackages.get(simulator); if (netSent == null) { log.error("no task found in {} for received result {}", simulator, netResult); return; } sentPackages.remove(simulator); try { WP netRemainder = netSent.getRemainder(netResult); if (netRemainder != null) { log.warn("queueing remainder: {}", netRemainder); queuedPackages.add(netRemainder); } } catch (InvalidWorkPackage e) { log.error("in simulator {}, result {} is in no relation to task {} ({}), rescheduling", simulator, netResult, netSent, e.getShortMessage(new StringBuilder())); queuedPackages.add(netSent); return; } returnPackage(netResult); // processFile(); } }; } protected void findNextPackage(final FutureHandler future) { if (!queuedPackages.isEmpty()) { WP workPackage = queuedPackages.pollFirst(); future.pass(workPackage); return; } generateNextPackage(new Future(experiment) { @Override protected void result(WP result) { if (result == null) { log.info("no more packages left"); future.pass(null); return; } future.pass(result); } }); } protected abstract void generateNextPackage(FutureHandler future); protected abstract void returnPackage(WP workPackage); /** * @return the newestTaskScheduled */ @ParamAnnotation(id = "newest_task_sent") public String getNewestTaskSent() { return newestTaskSent.get(); } /** * @return the newestResultReceived */ @ParamAnnotation(id = "newest_result_received") public String getNewestResultReceived() { return newestResultReceived.get(); } @ParamAnnotation(id = "newest_task_sent_changed") public void addNewestTaskSentListener(EventListener listener) { newestTaskSent.addListener(listener); } @ParamAnnotation(id = "newest_task_sent_changed") public void removeNewestTaskSentListener(EventListener listener) { newestTaskSent.removeListener(listener); } @ParamAnnotation(id = "newest_result_received_changed") public void addNewestResultReceivedListener(EventListener listener) { newestResultReceived.addListener(listener); } @ParamAnnotation(id = "newest_result_received_changed") public void removeNewestResultReceivedListener(EventListener listener) { newestResultReceived.removeListener(listener); } @ParamAnnotation(id = "messages") public void addMessageListener(EventListener listener) { messages.add(listener); } @ParamAnnotation(id = "messages") public void removeMessageListener(EventListener listener) { messages.remove(listener); } }