from typing import List, Any, Union
import pathlib
import re
import os
from abc import abstractmethod
from litesoph.common.job_submit import SubmitNetwork, SubmitLocal
from litesoph.common.data_sturcture.data_classes import TaskInfo
import uuid
class TaskError(RuntimeError):
"""Base class of error types related to any TASK."""
class TaskSetupError(TaskError):
"""Calculation cannot be performed with the given parameters.
Typically raised before a calculation."""
class InputError(TaskSetupError):
"""Raised if inputs given to the calculator were incorrect.
Bad input keywords or values, or missing pseudopotentials.
This may be raised before or during calculation, depending on
when the problem is detected."""
class TaskFailed(TaskError):
"""Calculation failed unexpectedly.
Reasons to raise this error are:
* Calculation did not converge
* Calculation ran out of memory
* Segmentation fault or other abnormal termination
* Arithmetic trouble (singular matrices, NaN, ...)
Typically raised during calculation."""
class ReadError(TaskError):
"""Unexpected irrecoverable error while reading calculation results."""
class TaskNotImplementedError(NotImplementedError):
"""Raised if a calculator does not implement the requested property."""
class PropertyNotPresent(TaskError):
"""Requested property is missing.
Maybe it was never calculated, or for some reason was not extracted
with the rest of the results, without being a fatal ReadError."""
class Task:
"""Base-class for all tasks. All the tasks in the litesoph
inherets from this class. This class is interfaced with
SubmitLocal and SubmitNetwork class.
The configuration dictionary the has information about path of
engine binarys.
This is the empty TaskInfo object to store all the data
generated by the task.
This is a list of TaskInfo objects of other tasks
that this task is dependent on.
BASH_filename = f'ls_job_script_{id}.sh'
job_script_first_line = "#!/bin/bash"
remote_job_script_last_line = "touch Done"
def __init__(self, lsconfig,
task_info: TaskInfo,
dependent_tasks: Union[List[TaskInfo],None]= None
) -> None:
self.lsconfig = lsconfig
self.task_info = task_info
self.task_name =
self.dependent_tasks = dependent_tasks = task_info.path
# self.project_dir is deprecated and should be removed.
self.project_dir =
self.engine_name = task_info.engine
self.engine_path = self.lsconfig['engine'].get(self.engine_name , self.engine_name)
mpi_path = self.lsconfig['mpi'].get('mpirun', 'mpirun')
self.mpi_path = self.lsconfig['mpi'].get(f'{self.engine_name}_mpi', mpi_path)
self.python_path = self.lsconfig['programs'].get('python', 'python')
def reset_lsconfig(self, lsconfig):
self.engine_path = lsconfig['engine'].get(self.engine_name , self.engine_name)
mpi_path = lsconfig['mpi'].get('mpirun', 'mpirun')
self.mpi_path = lsconfig['mpi'].get(f'{self.engine_name}_mpi', mpi_path)
def create_directory(directory):
absdir = os.path.abspath(directory)
if absdir != pathlib.Path.cwd and not pathlib.Path.is_dir(directory):
def create_template(self):
"""This method creates engine input and stores it in the task info.
Store the engine input in taskinfo.input['engine_input'][data]
filepath in taskinfo.input['engine_input']['path']"""
def create_input(self):
self.task_info.state.input_created = True
def save_input(self):
self.task_info.state.input_saved = True
def prepare_input(self):
def get_engine_input(self):
return self.task_info.input['engine_input']['data']
def set_engine_input(self, txt):
self.task_info.input['engine_input']['data'] = txt
def check_prerequisite(self, *_) -> bool:
# this function is depricated, should not be used.
return True
def create_job_script(self) -> list:
"""Create the bash script to run the job and "touch Done" command to it, to know when the
command is completed."""
job_script = []
return job_script
def write_job_script(self, job_script=None):
if job_script:
self.job_script = job_script
self.bash_file = / self.BASH_filename
with open(self.bash_file, 'w+', newline='\n') as f:
def add_proper_path(self, path):
"""This replaces the local paths to remote paths in the engine input."""
engine_input = self.task_info.input.get('engine_input', None)
if not engine_input:
template = engine_input.get('data', None)
if not template:
if str( in template:
text = re.sub(str(, str(path), template)
self.task_info.input['engine_input']['data'] = text
def set_submit_local(self, *args):
self.submit_local = SubmitLocal(self, *args)
def run_job_local(self,cmd):
cmd = cmd + ' ' + self.BASH_filename
def connect_to_network(self, *args, **kwargs):
self.submit_network = SubmitNetwork(self, *args, **kwargs)
def read_log(self, file):
with open(file , 'r') as f:
text =
return text
def check_output(self):
if self.task_info.job_info.submit_mode == 'remote':
check = self.task_info.job_info.submit_returncode
check = self.task_info.job_info.job_returncode
if check is None:
raise TaskFailed("Job not completed.")
return True
def write2file(directory,filename, template) -> None:
"""Write template to a file.
directroy: str
full path of the directory to write to.
filename: str
name of the file with extension
template: str
script template which needs to be written to file.
filename = pathlib.Path(directory) / filename
file_exists = os.access(filename, os.F_OK)
parent_writeable = os.access(filename.parent, os.W_OK)
file_writeable = os.access(filename, os.W_OK)
if ((not file_exists and not parent_writeable) or
(file_exists and not file_writeable)):
msg = f'Permission denied acessing file: {filename}'
raise PermissionError(msg)
with open(filename, 'w+') as f:
def assemable_job_cmd(job_id: str= '', engine_cmd:str = None, np: int =1, cd_path: str=None,
mpi_path: str = None,
remote : bool = False,
scheduler_block : str = None,
module_load_block : str = None,
extra_block : str = None) -> str:
job_script_first_line = "#!/bin/bash"
remote_job_script_last_line = f"touch Done_{job_id}"
job_script = [job_script_first_line]
if remote:
if scheduler_block:
if module_load_block:
if cd_path:
job_script.append(f'cd {cd_path}')
job_script.append(f'touch Start_{job_id}')
if engine_cmd:
if np > 1:
if not mpi_path:
mpi_path = 'mpirun'
job_script.append(f'{mpi_path} -np {np:d} {engine_cmd}')
# job_script.append(engine_cmd)
if extra_block:
if remote:
job_script = '\n'.join(job_script)
return job_script
def pbs_job_script(name):
head_job_script = f"""
#PBS -N {name}
#PBS -o output.txt
#PBS -e error.txt
#PBS -l select=1:ncpus=4:mpiprocs=4
#PBS -q debug
#PBS -l walltime=00:30:00
return head_job_script