"""High-level module for building a DataFrame from Platypus-style VCF.
This module calls functions from the :mod:`varona.dataframe` module to build
a DataFrame from a Platypus-style VCF file.
"""
import functools
import logging
import pathlib
import httpx
import polars as pl
from varona import bcftools, dataframe, ensembl, extract, maf
logger = logging.getLogger("varona.platypus")
API_DF_SCHEMA = {
"contig": pl.Utf8,
"pos": pl.UInt32,
"ref": pl.Utf8,
"alt": pl.Utf8,
"type": pl.Utf8,
"effect": pl.Utf8,
"gene_name": pl.Utf8,
"gene_id": pl.Utf8,
"transcript_id": pl.Utf8,
}
"""Polars schema for the API DataFrame."""
VCF_DF_SCHEMA = {
"contig": pl.Utf8,
"pos": pl.UInt32,
"ref": pl.Utf8,
"alt": pl.Utf8,
"sequence_depth": pl.UInt64,
"max_variant_reads": pl.UInt64,
"variant_read_pct": pl.Float64,
"maf": pl.Float64,
}
"""Polars schema for the VCF DataFrame."""
[docs]
def platypus_dataframe(
vcf_path: pathlib.Path,
maf_method: maf.MafMethod = maf.MafMethod.SAMPLES,
timeout: int = 300,
genome_assembly: ensembl.Assembly = ensembl.Assembly.GRCH37,
vcf_extractor=extract.platypus_vcf_record_extractor,
api_extractor=extract.default_vep_response_extractor,
no_vep: bool = False,
vep_json_path: pathlib.Path | None = None,
) -> pl.DataFrame:
"""Read a Platypus VCF file into a DataFrame.
:param vcf_path: The path to the Platypus VCF file.
:param maf_method: The method to use for calculating the MAF.
:param timeout: The timeout (seconds) for the VEP API query.
:param genome_assembly: The genome assembly used in the Ensembl VEP API.
:param vcf_extractor: The function to extract data from the VCF.
:param api_extractor: The function to extract data from the VEP API response.
:param no_vep: Skip querying the VEP API.
:param vep_json_path: Path to the VEP output file from running VEP locally,
(bypasses querying API).
:return: A DataFrame with the VCF data.
"""
# VCF part
lst = []
maf_func = functools.partial(maf.maf_from_method, method=maf_method)
if bcftools.HAVE_BCFTOOLS and maf_method == maf.MafMethod.BCFTOOLS:
with bcftools.VariantFileFilledInTags(vcf_path, fillin_tags=["MAF"]) as vf:
for record in vf:
new_item = vcf_extractor(record, maf=maf_func)
lst.append(new_item)
else:
lst = list(dataframe._vcf_rows(vcf_path, vcf_extractor))
vcf_df = pl.DataFrame(lst, schema=VCF_DF_SCHEMA)
if no_vep and vep_json_path is None:
# no_vep ignored if vep_json_path is provided
return vcf_df
vep_df = None
if vep_json_path:
vep_df = pl.DataFrame(
list(
ensembl.import_vep_data(
vep_json_path,
json_extractor=extract.default_vep_cli_json_extractor,
)
),
schema=API_DF_SCHEMA,
)
else:
# API part
chunks = list(ensembl.vcf_to_vep_query_data(vcf_path))
n_chunks = len(chunks)
vep_df = pl.DataFrame(
{k: [] for k in API_DF_SCHEMA.keys()}, schema=API_DF_SCHEMA
)
with httpx.Client(
limits=httpx.Limits(max_connections=5, max_keepalive_connections=5),
timeout=httpx.Timeout(float(timeout)),
) as client:
for ix, chunk in enumerate(chunks, start=1):
chunk_df = dataframe.vep_api_dataframe(
client, chunk, genome_assembly, api_extractor, schema=API_DF_SCHEMA
)
vep_df = vep_df.vstack(chunk_df)
logger.info(f"processed {ix}/{n_chunks} chunks from VEP API")
vep_df.rechunk()
combined_df = vcf_df.join(vep_df, on=["contig", "pos", "ref", "alt"], how="left")
return combined_df