Source code for pyfurnace.generate.road

import os
import sys
import subprocess
import tempfile
import zipfile
import shutil
import warnings
from contextlib import contextmanager
from typing import Callable, Optional, Tuple, Union
import multiprocessing

### Local imports
from ..design.core.symbols import dot_bracket_to_pair_map, nucl_to_none
from .utils import find_stems_in_multiloop
from .pk_utils import parse_pseudoknots
from .viennarna import fold_p


@contextmanager
def _road_cwd(path: str):
    """Context manager to change the current working directory and revert it back."""
    oldpwd = os.getcwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(oldpwd)


[docs] def generate_road( structure: str, sequence: str, pseudoknots: Union[str, dict] = "", name: str = "origami", initial_sequence: Optional[str] = None, callback: Optional[Callable[[str, str, str, str, float], None]] = None, verbose: bool = False, timeout: int = 7200, directory: Optional[str] = None, zip_directory: bool = False, origami_code: Optional[str] = None, ) -> Union[str, Tuple[str, str]]: """ Generate an RNA origami design using the ROAD Revolvr algorithm. This function orchestrates the structure processing, pseudoknot embedding, and Perl-based design process for RNA origami, with optional support for real-time updates and zipped output. Parameters ---------- structure : str RNA secondary structure in dot-bracket notation. sequence : str Initial sequence to use for the design (must match structure length). pseudoknots : str or dict, optional Pseudoknot definitions in ROAD-compatible format (string or parsed dict). name : str, optional Base name for the design file (default is 'origami'). initial_sequence : str, optional Optional initial sequence to pre-fill as the starting point of the optimization. callback : callable, optional A function called during ROAD execution with progress updates: `callback(structure, sequence, line, stage_name, stage_progress)` verbose : bool, optional If True, prints progress updates to the console. If `callback` is provided, this is ignored. Default is False. timeout : int, optional Timeout in seconds for the ROAD optimization (default is 7200 seconds = 2 hours). directory : str, optional Directory where files should be written. If None, a temporary directory is used. zip_directory : bool, optional Whether to generate a `.zip` file with all intermediate and result files. origami_code : str, optional Source code (e.g., Python) representing the origami design, to be saved and zipped. Returns ------- str or tuple If `zip_directory` is False: returns the designed RNA sequence as a string. If `zip_directory` is True: returns the tuple `(designed_sequence, path_to_zip_file)`. Raises ------ ValueError If the structure and sequence lengths do not match, or if multistrand '&' is used. Warnings -------- - If the working directory doesn't exist, it is created. - If the final design file is missing, a warning is issued. """ road_dir = __file__.replace("road.py", "road_bin") files_to_include = [] # Sanity check if "&" in sequence or "&" in structure: raise ValueError( "The ROAD Revolvr algorithm does not support multistranded structures." ) if len(sequence) != len(structure): raise ValueError( "The length of the sequence and structure must match." " Got {} and {}.".format(len(sequence), len(structure)) ) if verbose and callback is None: def callback(_, __, spool_line, stage_name, *args): print(f"{stage_name}: {spool_line.strip()}", flush=True) ### Fix the short stems with '{' ROAD symbols struct_list = list(structure) pair_map = dot_bracket_to_pair_map(structure) for dt in find_stems_in_multiloop(structure): # force pairing in dovetails that are shorter than 3 if dt[-1] - dt[0] + 1 <= 2: for i in range(dt[0], dt[1] + 1): struct_list[i] = "{" struct_list[pair_map[i]] = "}" ### ADD THE PSEUDOKNOTS ROAD NOTATION avg_pk_E = 0 avg_pk_dE = 0 if not pseudoknots: avg_pk_E = -9.0 avg_pk_dE = 1.81 if isinstance(pseudoknots, dict): pk_dict = pseudoknots else: pk_dict = parse_pseudoknots(pseudoknots) road_pk_notation = { "A": "1", "B": "2", "C": "3", "D": "4", "E": "5", "F": "6", "G": "7", "H": "8", "I": "9", } external_pk_count = 0 # sort the pseudoknots by their index pk_dict = { k: v for k, v in sorted( pk_dict.items(), key=lambda item: min( inds[0] for inds in item[1]["ind_fwd"] + item[1]["ind_rev"] ), ) } # Calculate the average energy and dE of the pseudoknots for pk_info in pk_dict.values(): used = False avg_pk_E += pk_info["E"] avg_pk_dE += abs(pk_info["dE"]) pk_sym = list(road_pk_notation)[external_pk_count] # Find the pk with the lowest index, between fwd and rev min_fwd = min([inds[0] for inds in pk_info["ind_fwd"]], default=float("inf")) min_rev = min([inds[0] for inds in pk_info["ind_rev"]], default=float("inf")) if min_fwd < min_rev: first_pk = pk_info["ind_fwd"] second_pk = pk_info["ind_rev"] else: first_pk = pk_info["ind_rev"] second_pk = pk_info["ind_fwd"] for start, end in first_pk: if struct_list[start] not in ".()": continue for i in range(start, end + 1): struct_list[i] = pk_sym used = True for start, end in second_pk: if struct_list[start] not in ".()": continue for j in range(start, end + 1): struct_list[j] = road_pk_notation[pk_sym] used = True if used: external_pk_count += 1 if pk_dict: avg_pk_E /= len(pk_dict) avg_pk_dE /= len(pk_dict) structure = "".join(struct_list) ### COPY THE PYTHON PATH AND ADD RNAfold python_path = sys.executable # Get path to the current Python interpreter python_dir = os.path.dirname(python_path) # Prepend it to PATH, to make sure the correct Python is used env = os.environ.copy() env["PATH"] = python_dir + os.pathsep + os.environ["PATH"] ### CHECK THE TEMPORARY DIRECTORY if directory is None: tempdir = tempfile.TemporaryDirectory() directory = tempdir.name else: if not os.path.exists(directory): warnings.warn(f"Directory {directory} does not exist. Creating it.") os.makedirs(directory) tempdir = None # take the absolute path of the directory directory = os.path.abspath(directory) ### WORK IN THE DIRECTORY # save the origami code if origami_code is not None: origami_code_path = os.path.join(directory, f"{name}.py") with open(origami_code_path, "w", encoding="utf-8") as f: f.write(origami_code) # include it in the zip files_to_include.append(origami_code_path) # create the target input file target_path = os.path.join(directory, "target.txt") with open(target_path, "w", encoding="utf-8") as f: f.write(f"{name}\n{structure}\n{sequence}\n") if initial_sequence is not None and len(initial_sequence) == len(structure): f.write(f"{initial_sequence}\n") # include it in the zip files_to_include.append(target_path) # read the revolvr file revolvr_local_path = os.path.join(road_dir, "revolvr.pl") with open(revolvr_local_path, "r") as f: revolvr_text = f.read() # replace the KL energy parameters revolvr_text = revolvr_text.replace( "my $MinKL = -7.2;", f"my $MinKL = {avg_pk_E + avg_pk_dE};" ) revolvr_text = revolvr_text.replace( "my $MaxKL = -10.8;", f"my $MaxKL = {avg_pk_E - avg_pk_dE};" ) revolvr_text = revolvr_text.replace( "my $timeout_length = 7200;", f"my $timeout_length = {int(timeout)};" ) # create the revolvr file with specific KL parameters out_revolvr = os.path.join(directory, "revolvr.pl") with open(out_revolvr, "w", encoding="utf-8") as f: f.write(revolvr_text) # include it in the zip files_to_include.append(out_revolvr) vienna_out_path = os.path.join(directory, "viennarna_funcs.py") shutil.copyfile(os.path.join(road_dir, "viennarna_funcs.py"), vienna_out_path) # include it in the zip files_to_include.append(vienna_out_path) # move to the directory with _road_cwd(directory): command = f'perl revolvr.pl "{directory}"' process = subprocess.Popen( command, shell=True, cwd=directory, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, # makes output strings ) # Read output in real time last_seq = "" last_struct = "" prev_line = "" n_stage = 0 stages = [ "Designing", "GC-Reduction & GC/UA-Rich Reduction", "Mapping Kissing Loops", "Optimization", ] for line in process.stdout: line = line.strip() # update the stage if stages[n_stage] in line and n_stage < len(stages) - 1: n_stage += 1 # update the last sequence and structure if prev_line and not prev_line.translate(nucl_to_none): last_seq = prev_line if line and any(s in line for s in ".()"): last_struct = line if line and last_seq and last_struct and callback: callback( last_struct, last_seq, line, stages[n_stage - 1], n_stage / len(stages), ) prev_line = line # wait for process to finish process.wait() # spool file files_to_include.append(os.path.join(directory, f"{name}_spool.txt")) # read the results design_path = os.path.join(directory, f"{name}_design.txt") if not os.path.exists(design_path): last_seq = "" warnings.warn( f"Design file {design_path} not found. Optimization failed. " "Please check the ROAD algorithm output." ) else: with open(os.path.join(directory, f"{name}_design.txt"), "r") as f: lines = f.readlines() last_seq = lines[2].strip() # include it in the zip files_to_include.append(design_path) if zip_directory: # create a temporary zip file temp_zip = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) temp_zip.close() # Close so we can write to it # Create the zip and add selected files with zipfile.ZipFile(temp_zip.name, "w", zipfile.ZIP_DEFLATED) as zipf: for file in files_to_include: try: # Store relative to directory for cleaner archive structure zipf.write(file, arcname=os.path.basename(file)) except Exception as e: warnings.warn(f"Failed to add {file} to zip: {e}") if tempdir is not None: # Close the temporary directory tempdir.cleanup() # Return the path to the zip file if zip_directory: return last_seq, temp_zip.name return last_seq
def _worker_single_road( structure: str, sequence: str, pseudoknots, name: str, callback: callable = None, verbose: bool = False, timeout: int = 7200, zip_directory: bool = False, origami_code: Optional[str] = None, initial_sequence: Optional[str] = None, result_queue: multiprocessing.Queue = None, stop_event: "multiprocessing.Event" = None, ): """ Internal worker function that runs a single ROAD design trial. This function wraps `generate_road` for parallel execution. It checks whether an early stop is requested via `stop_event`, runs the design pipeline, and returns the result via a shared `result_queue`. Parameters ---------- structure : str RNA secondary structure in dot-bracket notation. sequence : str Input sequence, same length as the structure. pseudoknots : str or dict Pseudoknot annotations in ROAD-compatible format. name : str Base name for generated files. callback : callable, optional Optional function for progress updates. verbose : bool, optional If True, prints updates to the console. timeout : int, optional Max time in seconds to allow ROAD to run. zip_directory : bool, optional Whether to store intermediate and output files in a ZIP archive. origami_code : str, optional Python code to be stored with the output files. initial_sequence : str, optional Optional prefilled sequence as the starting point. result_queue : multiprocessing.Queue, optional Queue for storing results from each trial. stop_event : multiprocessing.Event, optional Used to signal early stopping when a successful result is found. """ try: if stop_event.is_set(): return result = generate_road( structure=structure, sequence=sequence, pseudoknots=pseudoknots, name=name, initial_sequence=initial_sequence, callback=callback, verbose=verbose, timeout=timeout, zip_directory=zip_directory, origami_code=origami_code, ) if result: result_queue.put(result) stop_event.set() except Exception as e: print(f"Error in ROAD design process: {e}", file=sys.stderr)
[docs] def parallel_road( structure: str, sequence: str, pseudoknots="", name: str = "origami", callback=None, verbose=False, timeout: int = 7200, zip_directory: bool = True, origami_code: str = None, initial_sequence: str = None, n_trials: int = 8, save_to: Optional[str] = None, wait_for_all: bool = False, ) -> Union[str, Tuple[str, str]]: """ Run multiple ROAD optimization trials in parallel and return results. You can choose between returning early on the first successful result or waiting for all trials to complete. Results can optionally be ZIP-archived and saved. If the wait_for_all flag is set to True, the sequences are returned sorted by their MFE frequency in the ensemble (highest to lowest). Parameters ---------- structure : str Dot-bracket notation of the target RNA secondary structure. sequence : str Reference sequence with the same length as the structure. pseudoknots : str or dict, optional ROAD-style or pyFuRNAce-style pseudoknot definition. name : str, optional Base name for output files (default: "origami"). callback : callable, optional Optional function that receives real-time progress updates. verbose : bool, optional If True, prints progress to console. timeout : int, optional Timeout in seconds per trial (default: 7200). zip_directory : bool, optional Whether to save each trial's output as a ZIP archive. origami_code : str, optional Python source code to include with the design output. initial_sequence : str, optional Optional sequence to use as a starting point in optimization. n_trials : int, optional Number of parallel optimization trials to run. save_to : str, optional Directory to store ZIP outputs if `zip_directory=True`. wait_for_all : bool, optional If True, waits for all trials to finish and returns results sorted by MFE frequency. If False, returns as soon as the first trial succeeds. Returns ------- str or Tuple[str, str] or List[str] - If `wait_for_all=False` returns the first designed sequence. - If `wait_for_all=True` returns a list of designed sequences. Raises ------ RuntimeError If no trials successfully generate a design. """ manager = multiprocessing.Manager() result_queue = manager.Queue() stop_event = manager.Event() processes = [] for i in range(n_trials): p = multiprocessing.Process( target=_worker_single_road, args=( structure, sequence, pseudoknots, name, callback, verbose, timeout, zip_directory, origami_code, initial_sequence, result_queue, stop_event, ), ) processes.append(p) p.start() results = [] try: if wait_for_all: # Wait for all trials to finish and collect all successful results for _ in range(n_trials): try: result = result_queue.get(timeout=timeout) results.append(result) except Exception: # likely empty queue error pass # skip failed or timed-out results else: # Wait for the first successful result try: result = result_queue.get(timeout=timeout) results.append(result) except Exception: # likely empty queue error pass # no result within timeout stop_event.set() finally: for p in processes: p.terminate() for p in processes: p.join() if not results: raise RuntimeError("No successful ROAD design found in any trial.") # order the sequences from highest frequency of MFE in the ensemble to lowest results = sorted(results, key=lambda x: -fold_p(x[0])[2]) if zip_directory and isinstance(result, tuple) and save_to: # If saving zips and save_to is set, move them all if not os.path.exists(save_to): os.makedirs(save_to) warnings.warn(f"Directory {save_to} did not exist. Created it.") for i, res in enumerate(results): if isinstance(res, tuple): final_zip_path = os.path.join(save_to, f"{name}_trial{i + 1}.zip") shutil.move(res[1], final_zip_path) if wait_for_all: # return all sequences in a list return [res[0] for res in results if isinstance(res, tuple)] else: # return the first sequence return results[0][0] # Return appropriately if wait_for_all: return results else: return results[0]