Source code for ewoksid02.tasks.dahuprocessingtask
# -*- coding: utf-8 -*-
import json
import os
import gevent
import tango
from ewokscore import Task
# Deprecated, to be substituted by the Id02ProcessingTask children
[docs]
class SingleDetector(
Task, input_names=["parameters"], output_names=["files", "runtime", "logging"]
):
"""
Trigger DAHU in the workflow.
"""
[docs]
def tango_execute(self, N, *args, **kwargs):
for i in range(N):
try:
return self._proxy.command_inout(*args, **kwargs)
except tango.CommunicationFailed:
if i == N - 1:
raise
gevent.sleep(i**2)
[docs]
def run(self):
proxies = ["dau/dahu/2", "dau/dahu/4", "dau/dahu/5", "dau/dahu/1", "dau/dahu/3"]
params = self.inputs["parameters"]
# Create the output dir as DAHU has trouble to do it properly
if "output_dir" in params:
os.makedirs(params["output_dir"], exist_ok=True)
for p in proxies:
try:
params["dahu_proxy"] = p
self._proxy = tango.DeviceProxy(p)
pset = ["id02.singledetector", json.dumps(params)]
jobid = self.tango_execute(2, "startJob", pset)
except Exception:
print(f"Unable to submit job on {p}")
continue
break
else:
raise RuntimeError("Unable to find a DAHU server to submit the job!")
print(f"DAHU job id: {jobid} on {p}")
gevent.sleep(2)
self.tango_execute(30, "waitJob", jobid)
ret = json.loads(self.tango_execute(5, "getJobOutput", jobid))
# print(ret)
self.outputs["files"] = ret["files"]
self.outputs["runtime"] = ret["job_runtime"]
self.outputs["logging"] = ret["logging"]
if "error" in ret:
print(ret["error"])
raise RuntimeError(f"DAHU job failed. \n{ret['error']}")