Source code for litesoph.common.data_sturcture.data_classes

from dataclasses import dataclass, field, asdict
import copy
from pathlib import Path
from typing import Any, Dict, List, Union
import json
import os
import uuid
from litesoph.common.data_sturcture.utils import WorkflowInfoEncoder

@dataclass
class State:
    """ This class store the state of jobs, tasks, workflows."""


    @classmethod
    def from_dict(cls, data: Dict[Any, Any]):
        state = cls()
        for key in data:
            setattr(state, key, data[key])
        return state

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

def factory_state():

    return State()

@dataclass
class Info:
    """Base class for info objects which required unique id."""

    _uuid: str

    @property
    def uuid(self):
        return self._uuid

    @uuid.setter
    def uuid(self, value):
        raise AttributeError('Denied')

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

    def to_json(self) -> str:
        return json.dumps(self, cls=WorkflowInfoEncoder, indent=3)

    # def save(self, fp):
    #     try:
    #         json_txt = self.to_json()
    #     except TypeError:
    #         raise
    #     fp.write(json_txt)

@dataclass
class JobInfo:
    """This class stores information about a job.

    parameters
    ----------
    id:
      The id of the job
    directory: 
        The directory in which the job is running
    state: 
        State of the job
    job_script:
        bash script to run the job
    submit_mode: 
        whether the job was submitted locally or to a remote machine.
    job_returncode: 
        return code of the job
    submit_returncode:
        retrun code of job submitting to remote machine.
    submit_output:
        output of job submission to remote machine
    submit_errors:
        errors of job submission to remote machine
    output: 
        Output of the job.
    error: 
        Error of the job
    """
    id: Union[str, None] = field(default= None)
    directory: Union[Path, None] = field(default=None)
    state: Union[str, None] = field(default= None)
    job_script: Union[str, None] = field(default=None)
    submit_mode: Union[str, None] = field(default= 'local')
    job_returncode: Union[int, None] = field(default= None)
    submit_returncode: Union[int, None] = field(default= None)
    submit_output: Union[str, None] = field(default= None)
    submit_error: Union[str, None] = field(default=None)
    output: Union[str, None] = field(default= None)
    error: Union[str, None] = field(default= None)

    @classmethod
    def from_dict(cls, data):
        cls = cls()
        for key, value in data.items():
            if key == 'directory' and isinstance(value, str):
                value = Path(value)

            setattr(cls, key, value)
        return cls

@dataclass
class Block:
    """This class stores information about a block in a the workflow.
    The block is a collection of tasks.
    
    parameters
    ----------

    name: 
        The name of the block.
    store_same_task_type: 
        True if all tasks in the block are of same type.
    task_type: 
        The type of task if "store_same_task_type" is true else None.
    task_uuids: 
        The list of task ids associated with the task.
    metadata: 
        Any optional information about the tasks."""

    name: str
    store_same_task_type: bool = False
    task_type: Union[str, None] = field(default= None)
    task_uuids: List[str] = field(default_factory= list)
    metadata: Dict[str, str] = field(default_factory= dict)

    
    @classmethod
    def from_dict(cls, data):
        return cls(name = data['name'],
                    store_same_task_type = data.get('store_same_task_type', False),
                    task_type = data.get('task_type', None),
                    task_uuids = data.get('task_uuids', list()),
                    metadata = data.get('metadata', dict()))

    def clone(self):
        data = self.to_dict()
        data['task_uuids'].clear()
        return Block.from_dict(data)

    def to_dict(self):
        return asdict(self)

    
def factory_job_info():
    return JobInfo()
    
[docs] @dataclass class TaskInfo(Info): """This class stores all the information about a task. parameters ---------- name: Task identifier. engine: Which engine to use to run the task. state: store information about the state of the task. path: path of the workflow directory. task_data: It store any miscellaneous information about the task that depends on the engine. param: The input parameters of the task. engine_param: The input parameters of the task in the format of the engine used. input: It's a dictionary store that stores input files generated for the task. output: It stores output files generated for by the task. local_copy_files: list of relative paths of files to be copied to clone a task. remote_copy_files: list of relative paths of files to be copied from the remote machine. job_info: Containes all the information about submitting and running the job. (This is new feature in development) network: Contains information about the job that was submitted to network. local: Contains information about the job that was submitted locally. (network and the local variable will be removed once the job_info is incorporated.) """ _name: str engine: Union[str, None] = field(default=None) state: State = field(default_factory= factory_state) path: Union[Path, None] = field(default=None) task_data: Dict[Any, Any] = field(default_factory= dict) param: Dict[Any, Any] = field(default_factory=dict) engine_param: Dict[Any, Any] = field(default_factory=dict) input: Dict[Any, Any] = field(default_factory=dict) output: Dict[Any, Any] = field(default_factory=dict) local_copy_files: List[Any] = field(default_factory=list) remote_copy_files: List[Any] = field(default_factory=list) job_info: JobInfo = field(default_factory= factory_job_info) network: Dict[Any, Any] = field(default_factory=dict) local : Dict[Any, Any] = field(default_factory=dict) @property def name(self): return self._name @name.setter def name(self, value): raise AttributeError('Denied') @classmethod def from_dict(cls, data: Dict[Any, Any]): uuid = data['_uuid'] name = data['_name'] engine = data.get('engine', None) state = State.from_dict(data.get('state', dict())) param = data.get('param', dict()) input = data.get('input', dict()) output = data.get('output', dict()) network = data.get('network', dict()) local = data.get('local', dict()) path = data.get('path', None) if path is not None: path = Path(path) local_copy_files = data.get('local_copy_list', list()) remote_copy_files = data.get('remote_copy_list', list()) job_info = data.get('job_info', JobInfo()) if isinstance(job_info, dict): job_info = JobInfo.from_dict(job_info) return cls(_uuid = uuid, _name = name, path =path, engine= engine, state= state, param= param, input= input, output= output, local_copy_files = local_copy_files, remote_copy_files = remote_copy_files, task_data = data['task_data'], engine_param = data['engine_param'], job_info = job_info, network = network, local= local) def clone(self, task_info): for key, vlaue in self.__dict__.items(): if key in ['_uuid']: continue setattr(task_info, key, copy.deepcopy(vlaue)) return task_info
@dataclass class Container: """This class stores inforamtions about a task in the context of the workflow. Each container can be associated with only on task. parameters ---------- id: index of the container in the containers list. block_id: index of the block which the task belong to. task_type: the type of the task it is associated with. task_uuid: the uuid of the task it is associated with. workflow_uuid: the uuid of the workflow the task is present in. parameters: It dictionary that contains the parameters of the task in in context of the workflow. env_parameters: stores any miscellaneous information in context with with workflow. next : stores uuid of the next task. previous : stores uuid of the previous task.""" id : int block_id : int task_type: str task_uuid: str workflow_uuid: str parameters: Dict[str, Any] = field(default_factory=dict) env_parameters: Dict[str, Any] = field(default_factory=dict) next: Union[str, None] = field(default=None) previous: Union[str, None] = field(default=None) @classmethod def from_dict(cls, data: Dict[Any, Any]): return cls(id = data['id'], block_id = data['block_id'], task_type = data['task_type'], task_uuid = data['task_uuid'], workflow_uuid = data['workflow_uuid'], parameters = data.get('parameters', dict()), env_parameters = data.get('env_parameters', dict()), next = data.get('next', None), previous = data.get('previous', None)) def clone(self, task_uuid, workflow_uuid): data = self.to_dict() data.update(dict(task_uuid = task_uuid, workflow_uuid = workflow_uuid)) return Container.from_dict(data) def to_dict(self) -> Dict[str, Any]: return asdict(self)
[docs] @dataclass class WorkflowInfo(Info): """This class store all the information of a workflow. The workflow is modeled ordered sequence of blocks, where each contains a list of same type of task but with different parameters. For example Average specrtrum workflow is:: block_1(Ground state tasks) -> block_2(RT TDDFT tasks) -> block_3(compute spectrum tasks) -> block_4(compute average spectrum) ground_state rt tddft in x compute spectrum in x compute average spectrum rt tddft in y compute spectrun in y rt tddft in z compute spectrun in z parameters ---------- label: User given name of the workflow. path: Path to workflow directory. name: The type of workflow, for example: spectrum, ksd. description: string, description about the workflow. engine: The engine used in the workflow. task_mode: If false, the workflow comes with a defined sequence of tasks. For example, the spectrum workflow is: ground_state -> RT TDDFT -> compute spectrum. If true, the user is given full control over the workflow to add any kind of task to it. param: any parameters related to the workflow. steps: It's a list of blocks. containers: It's a list of containers. In a workflow, each task is associated with a container, which stores information about the task in context of the workflow. Currently, this list is used to navigate one task to another in a workflow. state: It stores information about what tasks are running and what step the workflow is in. (currently this variable is not in use) dependencies_map: It's a dictionary that maps each tasks with the list of tasks that it depend on it. tasks: It's a dictionary that maps that task uuid with the task_info objects. current_step: It's a list that store the current task the workflow is in. """ label: str path: Path _name: str = field(default='') description: str = field(default='') engine: Union[str, None] = field(default=None) task_mode: bool = field(default=False) param: Dict[Any, Any] = field(default_factory=dict) steps: List[Block] = field(default_factory=list) containers: List[Container] = field(default_factory=list) state: State = field(default_factory= factory_state) dependencies_map : Dict[str, str] = field(default_factory=dict) tasks: Dict[str , TaskInfo] = field(default_factory=dict) current_step: list = field(default_factory=list) @property def name(self): return self._name @name.setter def name(self, value): if self._name == '': self._name = value else: raise AttributeError('Denied') @classmethod def from_dict(cls, data: Dict[str, Any]): state = data.pop('state') state = State.from_dict(state) tasks = {uuid : TaskInfo.from_dict(task) for uuid, task in data['tasks'].items()} containers = [Container.from_dict(container) for container in data['containers']] current_step = data['current_step'] steps_data = data['steps'] steps = [] if not steps_data: steps.append(Block(name= 'task_mode')) for container in containers: steps[0].task_uuids.append(container.task_uuid) elif isinstance(steps_data[0], str): for block in steps_data: steps.append(Block(name= block, store_same_task_type=True)) for container in containers: steps[container.block_id].task_uuids.append(container.task_uuid) else: steps = [Block.from_dict(block) for block in steps_data] steps_data = steps return cls(_uuid = data['_uuid'], _name=data['_name'], description= data.get('description'), path= Path(data['path']), label =data.get('label'), engine = data.get('engine'), param= data.get('param'), state= state, task_mode = data.get('task_mode', False), steps = steps, containers = containers, tasks= tasks, dependencies_map = data['dependencies_map'], current_step=current_step)
[docs] @dataclass class ProjectInfo(Info): """This class stores all the information about a project. parameters ---------- label: Name of the project. path: Path to the project directory. description: string describing the project. config: configuration used in the project. workflows: List of all the workflow_info of workflows in the project.""" label: str path: Path description: str = field(default='') config: Dict[Any, Any] = field(default_factory=dict) workflows: List[WorkflowInfo]= field(default_factory=list) @classmethod def from_dict(cls, data: Dict[str, Any]): workflows = [WorkflowInfo.from_dict(workflow) for workflow in data['workflows']] return cls(_uuid = data['_uuid'], label=data['label'], description= data['description'], path =Path(data['path']), workflows= workflows) @classmethod def clone(cls, project_info): return cls
def factory_task_info(name: str) -> TaskInfo: return TaskInfo(str(uuid.uuid4()), name)