Source code for aiida_mlip.workflows.ht_workgraph

"""Workgraph to run high-throughput calculations."""

from pathlib import Path
from typing import Callable, Union

from aiida.engine import CalcJob, WorkChain
from aiida.orm import Str
from aiida_workgraph import WorkGraph, task
from ase.io import read

from aiida_mlip.helpers.help_load import load_structure


[docs] @task.graph_builder(outputs=[{"name": "final_structures", "from": "context.structs"}]) def build_ht_calc( calc: Union[CalcJob, Callable, WorkChain, WorkGraph], folder: Union[Path, str, Str], calc_inputs: dict, input_struct_key: str = "struct", final_struct_key: str = "final_structure", recursive: bool = True, ) -> WorkGraph: """ Build high throughput calculation WorkGraph. The `calc` must take a structure, by default `struct`, as one of its inputs. Tasks will then be created to carry out the calculation for each structure file in `folder`. Parameters ---------- calc : Union[CalcJob, Callable, WorkChain, WorkGraph] Calculation to be performed on all structures. folder : Union[Path, str, Str] Path to the folder containing input structure files. calc_inputs : dict Dictionary of inputs, shared by all the calculations. Must not contain `struct_key`. input_struct_key : str Keyword for input structure for `calc`. Default is "struct". final_struct_key : str Key for final structure output from `calc`. Default is "final_structure". recursive : bool Whether to search `folder` recursively. Default is True. Returns ------- WorkGraph The workgraph with calculation tasks for each structure. Raises ------ FileNotFoundError If `folder` has no valid structure files. """ wg = WorkGraph() structure = None if isinstance(folder, Str): folder = Path(folder.value) if isinstance(folder, str): folder = Path(folder) pattern = "**/*" if recursive else "*" for file in filter(Path.is_file, folder.glob(pattern)): try: read(file) except Exception: continue structure = load_structure(file) calc_inputs[input_struct_key] = structure calc_task = wg.add_task( calc, name=f"calc_{file.stem}", **calc_inputs, ) calc_task.set_context({final_struct_key: f"structs.{file.stem}"}) if structure is None: raise FileNotFoundError( f"{folder} is empty or has no readable structure files." ) return wg
[docs] def get_ht_workgraph( calc: Union[CalcJob, Callable, WorkChain, WorkGraph], folder: Union[Path, str, Str], calc_inputs: dict, input_struct_key: str = "struct", final_struct_key: str = "final_structure", recursive: bool = True, max_number_jobs: int = 10, ) -> WorkGraph: """ Get WorkGraph to carry out calculation on all structures in a directory. Parameters ---------- calc : Union[CalcJob, Callable, WorkChain, WorkGraph] Calculation to be performed on all structures. folder : Union[Path, str, Str] Path to the folder containing input structure files. calc_inputs : dict Dictionary of inputs, shared by all the calculations. Must not contain `struct_key`. input_struct_key : str Keyword for input structure for `calc`. Default is "struct". final_struct_key : str Key for final structure output from `calc`. Default is "final_structure". recursive : bool Whether to search `folder` recursively. Default is True. max_number_jobs : int Max number of subprocesses running within the WorkGraph. Default is 10. Returns ------- WorkGraph The workgraph ready to be submitted. """ wg = WorkGraph("ht_calculation") wg.add_task( build_ht_calc, name="ht_calc", calc=calc, folder=folder, calc_inputs=calc_inputs, input_struct_key=input_struct_key, final_struct_key=final_struct_key, recursive=recursive, ) wg.group_outputs = [ {"name": "final_structures", "from": "ht_calc.final_structures"} ] wg.max_number_jobs = max_number_jobs return wg