Source code for litesoph.common.project_manager

import shutil
from typing import Any, Dict
import uuid
import copy

from pathlib import Path
import os
from litesoph.common.utils import create_dir, PROJECT_DATA_FILE_RELATIVE_PATH, PROJECT_DATA_FILE_NAME, WORKFLOW_DATA_FILE_NAME
from litesoph.common.data_sturcture.data_classes import ProjectInfo, WorkflowInfo
from litesoph.visualization.visualize_geometry import VisualizeGeometry
from litesoph.common.workflow_manager import WorkflowManager
from litesoph.common.workflows_data import predefined_workflow
from litesoph.common.decision_tree import decide_engine

class WorkflowSetupError(Exception):
    """Raised when unable to creating or opening task."""

[docs] class ProjectManager: """This class is responsible for creating, loading and managing all the workflows in the projects. Parameters ---------- ls_manager: LSManager object loaded with a project. project_info: ProjectInfo to store all the information generated in the project. """ def __init__(self, ls_manager, project_info: ProjectInfo) -> None: self.ls_manager = ls_manager self.project_info = project_info self.label = project_info.label self.workflow_list = project_info.workflows self.project_path = project_info.path self.config = project_info.config self.current_workflow_info = None vis_tools = self.config.get('visualization_tools', None) self.visualize = VisualizeGeometry(vis_tools) if self.workflow_list: self.current_workflow_info = self.workflow_list[-1] else: self.new_workflow('workflow_1') self.save() def _create_workflow_info(self, label: str, description: str =''): current_workflow = self.project_path / label try: create_dir(current_workflow) except Exception as e: raise WorkflowSetupError(f'Unable to create New workflow. Error:{e}') workflow_info = WorkflowInfo(str(uuid.uuid4()), label=label, path= current_workflow, description=description) return workflow_info
[docs] def new_workflow(self, label: str, description: str =''): """This method creates a new workflow info and saves it in the project. Parameters ---------- label: user given name of the workflow description: description or comments about the workflow. """ self.current_workflow_info = self._create_workflow_info(label=label, description=description) self.append_workflow(self.current_workflow_info) self.save()
[docs] def clone_workflow(self, workflow_info_uuid : str, traget_workflow_type: str, branch_point: int, label: str, description: str =''): """This method creates new workflow by clone any workflows in the project into Parameters ---------- workflow_info_uuid: The uuid of the source workflow traget_workflow_type: The workflow type of the cloned workflow. branch_point: It's is the block id in the workflow. It is the point upto which workflow is cloned. label: label of the new cloned workflow description: description or comments about the cloned workflow.""" workflow_info = self.get_workflow_info(workflow_info_uuid) if workflow_info.name != traget_workflow_type: raise WorkflowSetupError('The traget worrkflow should be same as the parent workflow.') workflow_manager = WorkflowManager(self, workflow_info, self.config) cloned_workflow_info = self._create_workflow_info(label=label, description=description) cloned_workflow_info.name = copy.deepcopy(workflow_info.name) cloned_workflow_info = workflow_manager.clone(cloned_workflow_info, branch_point=branch_point) self.append_workflow(cloned_workflow_info) self.save()
def _get_workflow_manager(self, name): if name == 'task_mode': return WorkflowManager workflow_type = predefined_workflow.get(name, None) if not workflow_type: raise WorkflowSetupError(f'Workflow:{name} not defined.') return WorkflowManager
[docs] def start_workflow(self, workflow_type: str, param: Dict[str, Any]) -> WorkflowManager: """This method instantiates the workflow manager with the workflow_type and returns the workflow manager. Parameters ---------- workflow_type: The workflow indentifier. param: parameters.""" if not self.current_workflow_info: raise WorkflowSetupError('Create workflow') workflow_manager = self._get_workflow_manager(workflow_type) self.current_workflow_info.name = workflow_type if workflow_type == 'task_mode': self.current_workflow_info.task_mode = True self.current_workflow_info.param.update(param) workflow_manager = workflow_manager(self, self.current_workflow_info, config=self.config) return workflow_manager
[docs] def open_workflow(self, workflow_uuid: str) -> WorkflowManager: """This method opens already existing and defined workflow.""" workflow_info = self.get_workflow_info(workflow_uuid) workflow_manager = self._get_workflow_manager(workflow_info.name) workflow_manager = workflow_manager(self, workflow_info, config=self.config) self.current_workflow_info = workflow_info return workflow_manager
[docs] def get_workflow_info(self, workflow_uuid): """returns WorkflowInfo object from the uuid.""" for workflow in self.workflow_list: if workflow.uuid == workflow_uuid: return workflow raise ValueError(f"workflow with uuid: {workflow_uuid} doesn't exists.")
[docs] def list(self) -> list: """returns list of tuples contain label and uuids of all the workflows in the project.""" workflows = [(workflow.label, workflow.uuid) for workflow in self.workflow_list] return workflows
def add_geometry(self, geometry_file): geom_path = self.current_workflow_info.path / "coordinate.xyz" shutil.copy(geometry_file, geom_path) self.current_workflow_info.param.update({'geometry': str(geom_path)}) def check(self) -> bool: geom = self.current_workflow_info.param.get('geometry', None) if geom: return True else: return False def visualize_geometry(self): default_geom_file = self.current_workflow_info.path / 'coordinate.xyz' geom_file = self.current_workflow_info.param.get('geometry', str(default_geom_file)) self.visualize.render(geom_file) def get_summary(self): pass def save(self): file = self.project_path / self.ls_manager.project_data_file_relative_path try: json_txt = self.project_info.to_json() except TypeError: raise with open(file, 'w') as f: f.write(json_txt)
[docs] def remove(self, workflow_uuid): """This removes a workflow from the project. It will also delete the corresponding directory. """ for workflow in self.workflow_list: if workflow.uuid == workflow_uuid: shutil.rmtree(str(workflow.path)) self.workflow_list.remove(workflow)
def _change_directory(self, path): "changes current working directory" os.chdir(path)
[docs] def list_available_workflows(self): """Returns a list of all the predefined workflow types.""" return [workflow for workflow in predefined_workflow.keys()]
def append_workflow(self, workflow_info: WorkflowInfo): for workflow in self.workflow_list: if workflow.uuid == workflow_info.uuid: return self.workflow_list.append(workflow_info) def get_predefined_workflow(self): pass
def summary_of_current_project(project_data: dict): state = ["Summary of all the tasks performed."] engine_list = list(project_data.keys()) non_engine = ['name', 'path', 'tasks', 'geometry'] if engine_list: state.append(" ") for engine in engine_list: if engine in non_engine: continue state.append(f"Engine: {engine}") task_list = project_data[engine].keys() if task_list: for i, task in enumerate(task_list): if project_data[engine][task]['done'] == True: state.append(f" {task}") state.append(" ") else: state.append("No tasks have been performed yet.") state = "\n".join(state) return state