Source code for litesoph.common.workflow_manager

from typing import Dict, Union, List, Any, Union 
import copy
import os
from pathlib import Path
import shutil
from litesoph.common.task import Task
from litesoph.common.workflows_data import predefined_workflow, WorkflowTypes
from litesoph.common.engine_manager import EngineManager
from litesoph.common.data_sturcture import TaskInfo, WorkflowInfo, factory_task_info, Container, Block
import importlib
from litesoph.common.task_manager import check_task_completion
from litesoph.common.decision_tree import decide_engine, EngineDecisionError
from litesoph.engines import engine_classname


class TaskSetupError(Exception):
    """Raised when unable to creating or opening task."""

class WorkflowEnded(Exception):
    """Raised when the workflow has ended."""

[docs] class WorkflowManager: """This is the main interface to edit, modify and run workflows. In litesoph, workflow is modeled as a chain of blocks, where each block contains a list of simple tasks that the user can create and run. For example, consider the average spectrum workflow. We represent the average spectrum workflow as a chain of four blocks, where each block contains the same types of tasks but with different input parameters. :: Block-1 Block-2 Block-3 Block-4 |---------------| |-----------------| |--------------------| |----------------| | | | | | | | | | 1. ground |----> | 2. RT-TDDFT- x |----->| 5.compute-spectra-x|--->| 8. compute | | state | | 3. RT-TDDFT- y | | 6.compute-spectra-y| | average spectra| |---------------| | 4. RT-TDDFT- z | | 7.compute-spectra-z| |----------------| |-----------------| |--------------------| The dependenices_map maps how each simple task depends on the previous tasks. so, for above workflow :: 1 --> None : ground state doesn't depend on any tasks 2 --> 1 : RT-TDDFT-x dependents on ground state. 3 --> 1 : RT-TDDFT-y dependents on ground state. 4 --> 1 : RT-TDDFT-z dependents on ground state. 5 --> 2 : compute_spectra-x dependents on RT-TDDFT-x. 6 --> 3 : compute_spectra-y dependents on RT-TDDFT-y. 7 --> 4 : compute_spectra-z dependents on RT-TDDFT-z. 8 --> (5, 6, 7) : compute average spectra dependent on compute_spectra-x. compute_spectra-y and compute-spectra-z. parameters ---------- project_manager: The class that manages creation and deletion of workflows: workflow_info: This objects is used to store all the information about a workflow. config: The configurations used to run the tasks in the workflow. """ def __init__(self, project_manager, workflow_info: WorkflowInfo, config: Dict[str, str]) -> None: self.project_manager = project_manager self.config = config self.workflow_info = workflow_info self.workflow_type = workflow_info.name self.task_mode = workflow_info.task_mode self.engine = workflow_info.engine self.steps = workflow_info.steps self.containers = workflow_info.containers self.tasks = workflow_info.tasks self.directory = workflow_info.path self.current_step = workflow_info.current_step self.dependencies_map = workflow_info.dependencies_map self.current_task_info = None if not self.workflow_info.engine: self.choose_default_engine() if not self.current_step: if self.workflow_type == WorkflowTypes.TASK_MODE: self.task_mode = workflow_info.task_mode = True else: self.workflow_from_db = predefined_workflow.get(self.workflow_type) update_workflowinfo(self.workflow_from_db, workflow_info) self.current_container = self.containers[0] self.current_step.insert(0, 0) self.current_task_info = self.tasks.get(self.current_container.task_uuid) self.prepare_task() else: self.current_container = self.containers[self.current_step[0]] self.current_task_info = self.tasks.get(self.current_container.task_uuid)
[docs] def choose_default_engine(self): """Chooses a default engine for a given workflow type and sets the engine for the workflow.""" self.workflow_info.engine = decide_engine(self.workflow_type) self.engine = self.workflow_info.engine
def _get_engine_manager(self, engine_name) -> EngineManager: engine_class = engine_classname.get(engine_name) module_path = f'litesoph.engines.{engine_name}.{engine_name}_manager' engine_module = importlib.import_module(module_path) engine_manager = getattr(engine_module, f'{engine_class}Manager') return engine_manager()
[docs] def get_engine_task(self) -> Task: """This method returns the Task object of the current_task in the workflow.""" return self._get_task(self.current_task_info, task_dependencies=self.get_task_dependencies())
def _get_task(self, current_task_info, task_dependencies ) -> Task: engine_manager = self._get_engine_manager(self.engine) current_task_info.engine = self.engine task = engine_manager.get_task(self.config, self.workflow_type, current_task_info, task_dependencies) return task
[docs] def get_taskinfo(self, task_name) -> List[TaskInfo]: """This method returns the list of TaskInfo object with the given name in the workflow.""" task_list = [] for task_info in self.tasks.values(): if task_info.name == task_name: task_list.append(task_info) else: continue return task_list
[docs] def check_engine(self, engine)-> bool: """Checks whether a given engine implements the current workflow type""" engine_manager = self._get_engine_manager(engine) workflow_list = engine_manager.get_workflow_list() if self.workflow_type in workflow_list: return True else: return False
[docs] def set_engine(self, engine): """sets the engine of the workflow""" if self.workflow_info.task_mode: self.workflow_info.engine = engine self.engine = self.workflow_info.engine else: check = self.check_engine(engine) if check: self.workflow_info.engine = engine self.engine = self.workflow_info.engine else: raise EngineDecisionError(f'workflow: {self.workflow_type} is not supported implemented in {engine}')
[docs] def get_task_dependencies(self): """Returns a list of previous task_infos that the present task in dependent on.""" dependices_uuid = self.dependencies_map.get(self.current_task_info.uuid) depedent_task_infos = [] if isinstance(dependices_uuid ,str): depedent_task_infos.append(self.tasks.get(dependices_uuid)) elif isinstance(dependices_uuid, list): depedent_task_infos.extend(self.tasks.get(task_uuid) for task_uuid in dependices_uuid) for task_info in depedent_task_infos: if not check_task_completion(task_info): raise TaskSetupError(f'The Dependent task : {task_info.name}, uuid:{task_info.uuid} is not completed.') return depedent_task_infos
[docs] def next(self): """This method changes the current task to the next task in the workflow.""" self.save() if not self.current_step: self.current_container = self.containers[0] self.current_step.insert(0, 0) self.current_task_info = self.tasks.get(self.current_container.task_uuid) self.prepare_task() return if self.current_container.next is None: raise WorkflowEnded('No more tasks in the workflow.') self.current_step[0] += 1 self.current_container = self.containers[self.current_step[0]] self.current_task_info = self.tasks.get(self.current_container.task_uuid) self.prepare_task()
[docs] def add_block(self, block_id: int, name: str, store_same_task_type:bool = False, task_type = None, metadata = dict()): """ This method inserts a block into the workflow. parameters ---------- block_id: The index where block to be palce in the workflow. name: The name of the block. store_same_task_type: the variable which indicative if the block contain same type of the tasks. task_type: task type if the store_same_task_type is true. metadata: This stores information about the tasks in the blocks in the context of the workflow.""" if not store_same_task_type and task_type is not None: raise TaskSetupError('task_type must be None if store_same_task_type is True.') block = Block(name, store_same_task_type, task_type=task_type, metadata=metadata) self.steps.insert(block_id, block)
[docs] def add_task(self, task_name: str, block_id: int, step_id: int, parameters= dict(), env_parameters= dict(), dependent_tasks_uuid: Union[str, list]= list()): """This method adds a task into the workflow. parameters ---------- task_name: The task type. block_id: The index of the block to which the task to be added. step_id: The index in the task execution list to where task to be added. paremeter: The default input parameters of the task. env_parameters: This stores information about the task in the context of the workflow. dependent_tasks_uuid: The list of task_uuids to which the task dependents on. """ task_info = factory_task_info(task_name) try : self.steps[block_id].task_uuids.append(task_info.uuid) except IndexError: raise TaskSetupError(f'The block:{block_id} is not defined.') self.tasks[task_info.uuid] = task_info new_container = Container(step_id, block_id, task_name, task_info.uuid, self.workflow_info.uuid, parameters, env_parameters) if step_id == 0: self.containers.append(new_container) elif step_id == len(self.containers): self.containers.append(new_container) new_container.previous = self.containers[-2].task_uuid self.containers[-2].next = new_container.task_uuid else: self.containers.insert(step_id, new_container) new_container.previous = self.containers[step_id-1].task_uuid self.containers[step_id-1].next = new_container.task_uuid new_container.next = self.containers[step_id +1].task_uuid for container in self.containers[step_id+1:]: container.id += 1 self.add_dependency(task_info.uuid, dependent_tasks_uuid)
[docs] def add_dependency(self, task_uuid: str, dependent_tasks_uuid: Union[str, list]= list()): """Adds a dependency task list to the given task.""" dependent_list =[] if isinstance(dependent_tasks_uuid, str): dependent_list.append(dependent_tasks_uuid) else: dependent_list.extend(dependent_tasks_uuid) dependent_id = self.dependencies_map.get(task_uuid, None) if not dependent_id: self.dependencies_map.update({task_uuid: dependent_list}) return if isinstance(dependent_id, str): dependent_list.insert(0, dependent_id) self.dependencies_map.update({task_uuid: dependent_list}) else: dependent_id.extend(dependent_list) self.dependencies_map.update({task_uuid: dependent_id})
def check_block(self, block_id): check = True if block_id > len(self.steps)-1: check = False return check def get_continer_by_task_uuid(self, task_uuid): for container in self.containers: if container.task_uuid == task_uuid: return container raise ValueError(f'task_uuid:{task_uuid} not present in containers') def get_container_index(self, task_uuid): container = self.get_continer_by_task_uuid(task_uuid) return container.id def get_continer_by_block_id(self, block_id): for container in self.containers: if container.task_uuid == block_id: return container def prepare_task(self): if self.engine: self.current_task_info.engine = self.engine engine_manager = self._get_engine_manager(self.engine) param = engine_manager.get_default_task_param(self.current_task_info.name, self.get_task_dependencies()) param.update(self.current_container.parameters) self.current_task_info.param.update(param) self.current_task_info.path = self.directory def change_current_task(self, task_uuid): self.current_task_info = self.tasks.get(task_uuid) def check(self): pass def previous(self): pass
[docs] def clone(self, clone_workflow: WorkflowInfo, branch_point: int) -> WorkflowInfo: """This method clones a new workflow_info from the existing workflow_info. It clones the task_info from the existing workflow_info and with that it copies all the input and output files generated from that task into the new directory of the cloned task. """ # The concept of the blocks was introduced later then the concept of containers, so # loop over containers to clone the workflow. # The better solution might be to loop over blocks instead of containers. clone_workflow.engine = copy.deepcopy(self.engine) for block in self.steps: clone_workflow.steps.append(block.clone()) previous_container = None for _, container in enumerate(self.containers): ctask_info = factory_task_info(container.task_type) clone_container = container.clone(ctask_info.uuid, self.workflow_info.uuid) if previous_container is not None: previous_container.next = clone_container.task_uuid clone_container.previous = previous_container.task_uuid clone_workflow.containers.append(clone_container) previous_container = clone_container clone_workflow.steps[clone_container.block_id].task_uuids.append(ctask_info.uuid) parent_task_info = self.tasks.get(container.task_uuid) if container.block_id <= branch_point: ctask_info = parent_task_info.clone(ctask_info) ctask_info.path = copy.deepcopy(clone_workflow.path) copy_task_files(self.directory, parent_task_info.local_copy_files, clone_workflow.path) clone_workflow.tasks[ctask_info.uuid] = ctask_info dependent_tasks = self.dependencies_map.get(container.task_uuid) if not dependent_tasks: clone_workflow.dependencies_map[ctask_info.uuid] = None elif isinstance(dependent_tasks, str): index = self.get_container_index(dependent_tasks) clone_workflow.dependencies_map[ctask_info.uuid] = clone_workflow.containers[index].task_uuid elif isinstance(dependent_tasks, list): clone_workflow.dependencies_map[ctask_info.uuid] = [] for dtask in dependent_tasks: index = self.get_container_index(dtask) clone_workflow.dependencies_map[ctask_info.uuid].append(clone_workflow.containers[index].task_uuid) clone_workflow.current_step.insert(0, branch_point) return clone_workflow
def get_summary(self): pass def run_next_task(self): pass def run_block(self,): pass
[docs] def save(self): """ saves the workflow_info into the hard drive.""" self.project_manager.save()
def copy_task_files(source ,file_list, destination): workflow_path = Path(source) destination_path = Path(destination) for path in file_list: s_path = Path.joinpath(workflow_path, path) d_path = Path.joinpath(destination_path, path) sub_path = destination_path for part in Path(path).parent.parts: sub_path = sub_path / part if not sub_path.exists(): os.mkdir(sub_path) continue if s_path.is_dir(): shutil.copytree(s_path, d_path, dirs_exist_ok=True) continue shutil.copy(s_path, d_path) def update_workflowinfo(workflow_dict:dict, workflowinfo: WorkflowInfo): blocks = workflow_dict.get('blocks') task_sequence = workflow_dict.get('task_sequence') w_dependency = workflow_dict.get('dependency_map') steps = workflowinfo.steps tasks = workflowinfo.tasks containers = workflowinfo.containers dependencies = workflowinfo.dependencies_map tasks.clear() dependencies.clear() for block in blocks: steps.append(Block(name= block['name'], store_same_task_type= block.get('store_same_task_type', True), task_type=block.get('task_type'), metadata= block.get('metadata', dict()))) prev_cont = None for wstep in task_sequence: taskinfo = factory_task_info(wstep.task_type) container = Container(task_sequence.index(wstep), wstep.block_id, wstep.task_type, taskinfo.uuid, workflowinfo.uuid, wstep.parameters, wstep.env_parameters) steps[wstep.block_id].task_uuids.append(taskinfo.uuid) if prev_cont is not None: prev_cont.next = container.task_uuid container.previous = prev_cont.task_uuid containers.append(container) prev_cont = container dependent_tasks = w_dependency.get(str(task_sequence.index(wstep))) if dependent_tasks is None: dependencies[taskinfo.uuid] = None elif isinstance(dependent_tasks, str): dependencies[taskinfo.uuid] = containers[int(dependent_tasks)].task_uuid elif isinstance(dependent_tasks, list): dependencies[taskinfo.uuid] = [] for dtask_index in dependent_tasks: dependencies[taskinfo.uuid].append(containers[int(dtask_index)].task_uuid) tasks[taskinfo.uuid] = taskinfo