Source code for litesoph.common.data_sturcture.data_classes

from dataclasses import dataclass, field, asdict
import copy
from pathlib import Path
from typing import Any, Dict, List, Union
import json
import os
import uuid
from litesoph.common.data_sturcture.utils import WorkflowInfoEncoder

class State:
    """ This class store the state of jobs, tasks, workflows."""

    def from_dict(cls, data: Dict[Any, Any]):
        state = cls()
        for key in data:
            setattr(state, key, data[key])
        return state

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

def factory_state():

    return State()

class Info:
    """Base class for info objects which required unique id."""

    _uuid: str

    def uuid(self):
        return self._uuid

    def uuid(self, value):
        raise AttributeError('Denied')

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

    def to_json(self) -> str:
        return json.dumps(self, cls=WorkflowInfoEncoder, indent=3)

    # def save(self, fp):
    #     try:
    #         json_txt = self.to_json()
    #     except TypeError:
    #         raise
    #     fp.write(json_txt)

class JobInfo:
    """This class stores information about a job.

      The id of the job
        The directory in which the job is running
        State of the job
        bash script to run the job
        whether the job was submitted locally or to a remote machine.
        return code of the job
        retrun code of job submitting to remote machine.
        output of job submission to remote machine
        errors of job submission to remote machine
        Output of the job.
        Error of the job
    id: Union[str, None] = field(default= None)
    directory: Union[Path, None] = field(default=None)
    state: Union[str, None] = field(default= None)
    job_script: Union[str, None] = field(default=None)
    submit_mode: Union[str, None] = field(default= 'local')
    job_returncode: Union[int, None] = field(default= None)
    submit_returncode: Union[int, None] = field(default= None)
    submit_output: Union[str, None] = field(default= None)
    submit_error: Union[str, None] = field(default=None)
    output: Union[str, None] = field(default= None)
    error: Union[str, None] = field(default= None)

    def from_dict(cls, data):
        cls = cls()
        for key, value in data.items():
            if key == 'directory' and isinstance(value, str):
                value = Path(value)

            setattr(cls, key, value)
        return cls

class Block:
    """This class stores information about a block in a the workflow.
    The block is a collection of tasks.

        The name of the block.
        True if all tasks in the block are of same type.
        The type of task if "store_same_task_type" is true else None.
        The list of task ids associated with the task.
        Any optional information about the tasks."""

    name: str
    store_same_task_type: bool = False
    task_type: Union[str, None] = field(default= None)
    task_uuids: List[str] = field(default_factory= list)
    metadata: Dict[str, str] = field(default_factory= dict)

    def from_dict(cls, data):
        return cls(name = data['name'],
                    store_same_task_type = data.get('store_same_task_type', False),
                    task_type = data.get('task_type', None),
                    task_uuids = data.get('task_uuids', list()),
                    metadata = data.get('metadata', dict()))

    def clone(self):
        data = self.to_dict()
        return Block.from_dict(data)

    def to_dict(self):
        return asdict(self)

def factory_job_info():
    return JobInfo()
[docs] @dataclass class TaskInfo(Info): """This class stores all the information about a task. parameters ---------- name: Task identifier. engine: Which engine to use to run the task. state: store information about the state of the task. path: path of the workflow directory. task_data: It store any miscellaneous information about the task that depends on the engine. param: The input parameters of the task. engine_param: The input parameters of the task in the format of the engine used. input: It's a dictionary store that stores input files generated for the task. output: It stores output files generated for by the task. local_copy_files: list of relative paths of files to be copied to clone a task. remote_copy_files: list of relative paths of files to be copied from the remote machine. job_info: Containes all the information about submitting and running the job. (This is new feature in development) network: Contains information about the job that was submitted to network. local: Contains information about the job that was submitted locally. (network and the local variable will be removed once the job_info is incorporated.) """ _name: str engine: Union[str, None] = field(default=None) state: State = field(default_factory= factory_state) path: Union[Path, None] = field(default=None) task_data: Dict[Any, Any] = field(default_factory= dict) param: Dict[Any, Any] = field(default_factory=dict) engine_param: Dict[Any, Any] = field(default_factory=dict) input: Dict[Any, Any] = field(default_factory=dict) output: Dict[Any, Any] = field(default_factory=dict) local_copy_files: List[Any] = field(default_factory=list) remote_copy_files: List[Any] = field(default_factory=list) job_info: JobInfo = field(default_factory= factory_job_info) network: Dict[Any, Any] = field(default_factory=dict) local : Dict[Any, Any] = field(default_factory=dict) @property def name(self): return self._name @name.setter def name(self, value): raise AttributeError('Denied') @classmethod def from_dict(cls, data: Dict[Any, Any]): uuid = data['_uuid'] name = data['_name'] engine = data.get('engine', None) state = State.from_dict(data.get('state', dict())) param = data.get('param', dict()) input = data.get('input', dict()) output = data.get('output', dict()) network = data.get('network', dict()) local = data.get('local', dict()) path = data.get('path', None) if path is not None: path = Path(path) local_copy_files = data.get('local_copy_list', list()) remote_copy_files = data.get('remote_copy_list', list()) job_info = data.get('job_info', JobInfo()) if isinstance(job_info, dict): job_info = JobInfo.from_dict(job_info) return cls(_uuid = uuid, _name = name, path =path, engine= engine, state= state, param= param, input= input, output= output, local_copy_files = local_copy_files, remote_copy_files = remote_copy_files, task_data = data['task_data'], engine_param = data['engine_param'], job_info = job_info, network = network, local= local) def clone(self, task_info): for key, vlaue in self.__dict__.items(): if key in ['_uuid']: continue setattr(task_info, key, copy.deepcopy(vlaue)) return task_info
@dataclass class Container: """This class stores inforamtions about a task in the context of the workflow. Each container can be associated with only on task. parameters ---------- id: index of the container in the containers list. block_id: index of the block which the task belong to. task_type: the type of the task it is associated with. task_uuid: the uuid of the task it is associated with. workflow_uuid: the uuid of the workflow the task is present in. parameters: It dictionary that contains the parameters of the task in in context of the workflow. env_parameters: stores any miscellaneous information in context with with workflow. next : stores uuid of the next task. previous : stores uuid of the previous task.""" id : int block_id : int task_type: str task_uuid: str workflow_uuid: str parameters: Dict[str, Any] = field(default_factory=dict) env_parameters: Dict[str, Any] = field(default_factory=dict) next: Union[str, None] = field(default=None) previous: Union[str, None] = field(default=None) @classmethod def from_dict(cls, data: Dict[Any, Any]): return cls(id = data['id'], block_id = data['block_id'], task_type = data['task_type'], task_uuid = data['task_uuid'], workflow_uuid = data['workflow_uuid'], parameters = data.get('parameters', dict()), env_parameters = data.get('env_parameters', dict()), next = data.get('next', None), previous = data.get('previous', None)) def clone(self, task_uuid, workflow_uuid): data = self.to_dict() data.update(dict(task_uuid = task_uuid, workflow_uuid = workflow_uuid)) return Container.from_dict(data) def to_dict(self) -> Dict[str, Any]: return asdict(self)
[docs] @dataclass class WorkflowInfo(Info): """This class store all the information of a workflow. The workflow is modeled ordered sequence of blocks, where each contains a list of same type of task but with different parameters. For example Average specrtrum workflow is:: block_1(Ground state tasks) -> block_2(RT TDDFT tasks) -> block_3(compute spectrum tasks) -> block_4(compute average spectrum) ground_state rt tddft in x compute spectrum in x compute average spectrum rt tddft in y compute spectrun in y rt tddft in z compute spectrun in z parameters ---------- label: User given name of the workflow. path: Path to workflow directory. name: The type of workflow, for example: spectrum, ksd. description: string, description about the workflow. engine: The engine used in the workflow. task_mode: If false, the workflow comes with a defined sequence of tasks. For example, the spectrum workflow is: ground_state -> RT TDDFT -> compute spectrum. If true, the user is given full control over the workflow to add any kind of task to it. param: any parameters related to the workflow. steps: It's a list of blocks. containers: It's a list of containers. In a workflow, each task is associated with a container, which stores information about the task in context of the workflow. Currently, this list is used to navigate one task to another in a workflow. state: It stores information about what tasks are running and what step the workflow is in. (currently this variable is not in use) dependencies_map: It's a dictionary that maps each tasks with the list of tasks that it depend on it. tasks: It's a dictionary that maps that task uuid with the task_info objects. current_step: It's a list that store the current task the workflow is in. """ label: str path: Path _name: str = field(default='') description: str = field(default='') engine: Union[str, None] = field(default=None) task_mode: bool = field(default=False) param: Dict[Any, Any] = field(default_factory=dict) steps: List[Block] = field(default_factory=list) containers: List[Container] = field(default_factory=list) state: State = field(default_factory= factory_state) dependencies_map : Dict[str, str] = field(default_factory=dict) tasks: Dict[str , TaskInfo] = field(default_factory=dict) current_step: list = field(default_factory=list) @property def name(self): return self._name @name.setter def name(self, value): if self._name == '': self._name = value else: raise AttributeError('Denied') @classmethod def from_dict(cls, data: Dict[str, Any]): state = data.pop('state') state = State.from_dict(state) tasks = {uuid : TaskInfo.from_dict(task) for uuid, task in data['tasks'].items()} containers = [Container.from_dict(container) for container in data['containers']] current_step = data['current_step'] steps_data = data['steps'] steps = [] if not steps_data: steps.append(Block(name= 'task_mode')) for container in containers: steps[0].task_uuids.append(container.task_uuid) elif isinstance(steps_data[0], str): for block in steps_data: steps.append(Block(name= block, store_same_task_type=True)) for container in containers: steps[container.block_id].task_uuids.append(container.task_uuid) else: steps = [Block.from_dict(block) for block in steps_data] steps_data = steps return cls(_uuid = data['_uuid'], _name=data['_name'], description= data.get('description'), path= Path(data['path']), label =data.get('label'), engine = data.get('engine'), param= data.get('param'), state= state, task_mode = data.get('task_mode', False), steps = steps, containers = containers, tasks= tasks, dependencies_map = data['dependencies_map'], current_step=current_step)
[docs] @dataclass class ProjectInfo(Info): """This class stores all the information about a project. parameters ---------- label: Name of the project. path: Path to the project directory. description: string describing the project. config: configuration used in the project. workflows: List of all the workflow_info of workflows in the project.""" label: str path: Path description: str = field(default='') config: Dict[Any, Any] = field(default_factory=dict) workflows: List[WorkflowInfo]= field(default_factory=list) @classmethod def from_dict(cls, data: Dict[str, Any]): workflows = [WorkflowInfo.from_dict(workflow) for workflow in data['workflows']] return cls(_uuid = data['_uuid'], label=data['label'], description= data['description'], path =Path(data['path']), workflows= workflows) @classmethod def clone(cls, project_info): return cls
def factory_task_info(name: str) -> TaskInfo: return TaskInfo(str(uuid.uuid4()), name)