Source code for ewoksid02.tasks.dahuprocessingtask

# -*- coding: utf-8 -*-

import json
import os

import gevent
import tango
from ewokscore import Task

# Deprecated, to be substituted by the Id02ProcessingTask children


[docs] class SingleDetector( Task, input_names=["parameters"], output_names=["files", "runtime", "logging"] ): """ Trigger DAHU in the workflow. """
[docs] def tango_execute(self, N, *args, **kwargs): for i in range(N): try: return self._proxy.command_inout(*args, **kwargs) except tango.CommunicationFailed: if i == N - 1: raise gevent.sleep(i**2)
[docs] def run(self): proxies = ["dau/dahu/2", "dau/dahu/4", "dau/dahu/5", "dau/dahu/1", "dau/dahu/3"] params = self.inputs["parameters"] # Create the output dir as DAHU has trouble to do it properly if "output_dir" in params: os.makedirs(params["output_dir"], exist_ok=True) for p in proxies: try: params["dahu_proxy"] = p self._proxy = tango.DeviceProxy(p) pset = ["id02.singledetector", json.dumps(params)] jobid = self.tango_execute(2, "startJob", pset) except Exception: print(f"Unable to submit job on {p}") continue break else: raise RuntimeError("Unable to find a DAHU server to submit the job!") print(f"DAHU job id: {jobid} on {p}") gevent.sleep(2) self.tango_execute(30, "waitJob", jobid) ret = json.loads(self.tango_execute(5, "getJobOutput", jobid)) # print(ret) self.outputs["files"] = ret["files"] self.outputs["runtime"] = ret["job_runtime"] self.outputs["logging"] = ret["logging"] if "error" in ret: print(ret["error"]) raise RuntimeError(f"DAHU job failed. \n{ret['error']}")