Source code for poolparty.fixed_ops.from_fasta

"""FromFasta operation - create a pool from genomic region(s) in a FASTA file."""

from numbers import Real

from pyfaidx import Fasta

from ..dna_pool import DnaPool
from ..pool import Pool
from ..types import CardsType, Literal, Optional, Pool_type, RegionType, Sequence, Union, beartype
from ..utils import dna_utils

# Type alias for a single coordinate tuple: (chrom, start, stop, strand)
Coordinate = tuple[str, int, int, Literal["+", "-"]]


def _extract_sequence(
    fasta: Fasta,
    chrom: str,
    start: int,
    stop: int,
    strand: Literal["+", "-"],
) -> str:
    """Extract a single sequence from FASTA, handling circular genome wrap-around."""
    if start <= stop:
        seq = str(fasta[chrom][start:stop].seq)
    else:
        # Circular wrap-around: start > stop means sequence crosses origin
        chrom_len = len(fasta[chrom])
        seq = str(fasta[chrom][start:chrom_len].seq) + str(fasta[chrom][0:stop].seq)

    if strand == "-":
        seq = dna_utils.reverse_complement(seq)

    return seq


def _is_single_coordinate(coordinates) -> bool:
    """Check if coordinates is a single (chrom, start, stop, strand) tuple."""
    if not isinstance(coordinates, tuple | list):
        return False
    if len(coordinates) != 4:
        return False
    # Single coordinate: first element is string (chrom), rest are int/int/str
    return isinstance(coordinates[0], str) and isinstance(coordinates[1], int)


[docs] @beartype def from_fasta( fasta_path: str, coordinates: Union[Coordinate, Sequence[Coordinate]], pool: Optional[Union[Pool, str]] = None, region: RegionType = None, remove_tags: Optional[bool] = None, iter_order: Optional[Real] = None, prefix: Optional[str] = None, style: Optional[str] = None, cards: CardsType = None, ) -> DnaPool: """ Extract genomic region(s) from a FASTA file and create a Pool. Parameters ---------- fasta_path : str Path to the FASTA file (will be indexed with pyfaidx). coordinates : tuple or list of tuples Single coordinate as (chrom, start, stop, strand) or list of such tuples. Coordinates are 0-based [start, stop). If strand='-', sequence is reverse complemented. For circular genomes, start > stop indicates wrap-around. pool : Optional[Union[Pool, str]], default=None Background pool or sequence. If provided with region, extracted sequence(s) replace the region content. region : RegionType, default=None Region to replace in pool. Can be a marker name or [start, stop] interval. Required if pool is provided. remove_tags : Optional[bool], default=None If True and region is a marker name, remove marker tags from the output. Only relevant in single-coordinate mode (has no effect in batch mode). iter_order : Optional[Real], default=None Iteration order priority for the Operation (batch mode only). prefix : str, optional Prefix for sequence names. Names are "{prefix}_{chrom}:{start}-{stop}({strand})" or "{chrom}:{start}-{stop}({strand})" if no prefix. style : Optional[str], default=None Style to apply to extracted sequences (e.g., 'red', 'blue bold'). cards : list[str] or dict[str, str], optional Design card keys to include. Available keys (batch mode only): ``'seq_name'``, ``'seq_index'``. Ignored in single-coordinate mode. Returns ------- Pool_type A Pool yielding the extracted genomic sequence(s). """ from ..base_ops.from_seqs import from_seqs from ..party import get_active_party from .from_seq import from_seq party = get_active_party() if party is None: raise RuntimeError( "from_fasta requires an active Party context. " "Use 'with pp.Party() as party:' to create one." ) # Check if single coordinate or batch is_single = _is_single_coordinate(coordinates) if is_single: # Single region mode chrom, start, stop, strand = coordinates with Fasta(fasta_path) as fasta: seq = _extract_sequence(fasta, chrom, start, stop, strand) return from_seq( seq=seq, pool=pool, region=region, remove_tags=remove_tags, iter_order=iter_order, prefix=prefix, style=style, _factory_name="from_fasta", ) else: # Batch mode: list of coordinate tuples coords_list = list(coordinates) # Validate all tuples have 4 elements for i, coord in enumerate(coords_list): if len(coord) != 4: raise ValueError( f"Coordinate at index {i} must be (chrom, start, stop, strand), " f"got {len(coord)} elements" ) # Extract all sequences with Fasta(fasta_path) as fasta: seqs = [ _extract_sequence(fasta, chrom, start, stop, strand) for chrom, start, stop, strand in coords_list ] # Generate names: "{prefix}_{chrom}:{start}-{stop}({strand})" or "{chrom}:{start}-{stop}({strand})" if prefix: seq_names = [ f"{prefix}_{chrom}:{start}-{stop}({strand})" for chrom, start, stop, strand in coords_list ] else: seq_names = [ f"{chrom}:{start}-{stop}({strand})" for chrom, start, stop, strand in coords_list ] return from_seqs( seqs=seqs, pool=pool, region=region, style=style, seq_names=seq_names, mode="sequential", iter_order=iter_order, cards=cards, _factory_name="from_fasta", )