Source code for poolparty.base_ops.shuffle_seq

"""SeqShuffle operation - shuffle characters within a sequence region."""

from numbers import Real

import numpy as np

from ..operation import Operation
from ..pool import Pool
from ..types import CardsType, Integral, Literal, ModeType, Optional, Pool_type, RegionType, Seq, Union, beartype
from ..utils.dna_seq import DnaSeq


[docs] @beartype def shuffle_seq( pool: Union[Pool_type, str], region: RegionType = None, shuffle_type: Literal["mono", "dinuc"] = "mono", prefix: Optional[str] = None, mode: ModeType = "random", num_states: Optional[Integral] = None, iter_order: Optional[Real] = None, _remove_tags: bool = False, style: Optional[str] = None, cards: CardsType = None, _factory_name: Optional[str] = None, ) -> Pool: """ Create a Pool that shuffles characters within a specified region. Parameters ---------- pool : Pool_type Parent pool or sequence to shuffle. region : RegionType, default=None Region to shuffle. Can be a marker name (str), explicit interval [start, stop], or None to shuffle entire sequence. shuffle_type : Literal["mono", "dinuc"], default="mono" Type of shuffle to perform: - ``"mono"``: random permutation preserving mononucleotide composition. - ``"dinuc"``: Euler-path shuffle preserving dinucleotide frequencies. The first and last characters are always fixed (mathematical constraint of the Euler path algorithm). mode : ModeType, default='random' Shuffle mode: 'random'. Sequential is not supported. num_states : Optional[int], default=None Number of states for random mode. If None, defaults to 1 (pure random sampling). iter_order : Optional[Real], default=None Iteration order priority for the Operation. prefix : Optional[str], default=None Prefix for sequence names in the resulting Pool. style : Optional[str], default=None Style to apply to shuffled characters (e.g., 'red', 'blue bold'). cards : list[str] or dict, optional Design card keys to include. Available keys: ``'permutation'``. Returns ------- Pool A Pool that yields shuffled sequences. """ from ..fixed_ops.from_seq import from_seq pool_obj = from_seq(pool) if isinstance(pool, str) else pool op = SeqShuffleOp( parent_pool=pool_obj, region=region, shuffle_type=shuffle_type, prefix=prefix, mode=mode, num_states=num_states, name=None, iter_order=iter_order, _remove_tags=_remove_tags, style=style, cards=cards, _factory_name=_factory_name, ) # Preserve the pool type from the input pool_class = type(pool_obj) result_pool = pool_class(operation=op) return result_pool
[docs] class SeqShuffleOp(Operation): """Randomly shuffle characters within a region of the parent sequence.""" factory_name = "shuffle_seq" design_card_keys = ["permutation"]
[docs] def __init__( self, parent_pool: Pool, region: RegionType = None, shuffle_type: Literal["mono", "dinuc"] = "mono", spacer_str: str = "", prefix: Optional[str] = None, mode: ModeType = "random", num_states: Optional[Integral] = None, name: Optional[str] = None, iter_order: Optional[Real] = None, _remove_tags: bool = False, style: Optional[str] = None, cards: CardsType = None, _factory_name: Optional[str] = None, ) -> None: """Initialize SeqShuffleOp.""" if mode != "random": raise ValueError( f"mode={mode!r} is not supported for shuffle_seq. Only mode='random' is allowed." ) # Set factory_name if provided if _factory_name is not None: self.factory_name = _factory_name self._shuffle_type = shuffle_type self._style = style # Determine num_states if mode == "random": # num_states stays None for pure random mode pass else: num_states = 1 super().__init__( parent_pools=[parent_pool], num_states=num_states, mode=mode, seq_length=parent_pool.seq_length, name=name, iter_order=iter_order, prefix=prefix, region=region, remove_tags=_remove_tags, cards=cards, )
def _compute_core( self, parents: list[Seq], rng: Optional[np.random.Generator] = None, ) -> tuple[Seq, dict]: """Return shuffled Seq and design card. Note: Region handling is done by base class compute() method. parents[0] is the region content when region is specified. """ if self.mode == "random": if rng is None: raise RuntimeError( f"{self.mode.capitalize()} mode requires RNG - use Party.generate(seed=...)" ) else: raise RuntimeError(f"Unsupported mode {self.mode!r}") seq = parents[0].string # Cache molecular positions, position array, and sequence array for repeated sequences if not hasattr(self, "_mol_pos_cache"): self._mol_pos_cache = {} self._pos_arr_cache = {} self._seq_arr_cache = {} if seq not in self._mol_pos_cache: self._mol_pos_cache[seq] = self._get_molecular_positions(seq) self._pos_arr_cache[seq] = np.array(self._mol_pos_cache[seq], dtype=np.intp) self._seq_arr_cache[seq] = np.array(list(seq), dtype="U1") molecular_positions = self._mol_pos_cache[seq] pos_arr = self._pos_arr_cache[seq] num_molecular = len(molecular_positions) if num_molecular == 0: permutation = tuple() shuffled_seq = seq elif self._shuffle_type == "dinuc": from collections import defaultdict from ..utils.shuffle_utils import dinucleotide_shuffle mol_str = "".join(self._seq_arr_cache[seq][pos_arr]) shuffled_mol = dinucleotide_shuffle(mol_str, rng) seq_arr = self._seq_arr_cache[seq].copy() seq_arr[pos_arr] = list(shuffled_mol) shuffled_seq = "".join(seq_arr) # Compute permutation: greedily match each output char # to the first available input position of that character char_positions: dict[str, list[int]] = defaultdict(list) for i, c in enumerate(mol_str): char_positions[c].append(i) order = np.array( [char_positions[c].pop(0) for c in shuffled_mol], dtype=np.intp ) permutation = tuple(np.argsort(order).tolist()) else: order = rng.permutation(num_molecular) # Use argsort to compute inverse permutation (vectorized) permutation = tuple(np.argsort(order).tolist()) # Vectorized character shuffling using cached numpy arrays seq_arr = self._seq_arr_cache[seq].copy() molecular_chars = seq_arr[pos_arr] seq_arr[pos_arr] = molecular_chars[order] shuffled_seq = "".join(seq_arr) # Pass through parent styles and add styling to shuffled characters if requested output_style = parents[0].style if output_style is not None and self._style and molecular_positions: output_style = output_style.add_style( self._style, np.array(molecular_positions, dtype=np.int64) ) output_seq = DnaSeq(shuffled_seq, output_style) return output_seq, { "permutation": permutation, }