"""Module for querying Ensembl API.
This module also contains code to read the VCF file and prepare
the data for querying the Ensembl API.
"""
import contextlib
import csv
import functools
import io
import json
import logging
import pathlib
import time
import typing
import httpx
import pysam
from varona import enum
logger = logging.getLogger("varona.ensembl")
[docs]
class Assembly(enum.CiStrEnum):
"""Enum for the assembly choice.
There are only two choices for now, GRCh37 and GRCh38.
"""
GRCH37 = "GRCH37" # aka hg19 or human_g1k_v37
GRCH38 = "GRCH38" # aka hg38
VEP_DEFAULT_PARAMS = {
"species": "human",
"variant_class": True,
"pick": True,
}
VEP_MAX_CHUNK = 200
HTS_COMPRESSED_VALS = ("GZIP", "BGZF")
"""Possible values for the compression field in a :class:`pysam.VariantFile`.
"""
VCF_MASK_COLS = [2, 5, 6, 7]
"""Columns to mask in the VCF file.
Values in these columns are replaced with '.' before querying the
Ensembl API.
"""
API_LONG_RETRY = 60
"""Delay retrying an API call after a 429 response without Retry-After header.
Currently this is the only place in the code where such a delay is defined. In
the future, more settings for the httpx client may be desirable.
"""
@contextlib.contextmanager
def _text_from_bgzf(file_path, mode="r"):
"""Wraps a UTF-8 byte decoder around a BGZF-decompressor.
This is just a helper to open .vcf.gz files with the same call signature
as :func:`open` would on a plain .vcf file.
"""
with (
pysam.BGZFile(file_path, mode) as bgzf,
io.TextIOWrapper(bgzf, encoding="utf-8") as text,
):
yield text
[docs]
def vcf_rows(vcf_path: pathlib.Path) -> typing.Iterator[list[str]]:
"""Yields rows from a VCF file.
This function opens a VCF file that's either plain text or compressed.
Normally the :class:`pysam.VariantFile` class would be used to read
VCF files but in this case there's no need to parse the VCF fully as
VariantFile does, it's just a matter of reading tab-separated lines
of text.
This function could be moved to a more general module in the future,
but currently it's only needed by the Ensembl API querying code.
:param vcf_path: Path to the VCF file.
:return: Iterator of rows from the VCF file.
"""
file_opener = open
with pysam.VariantFile(vcf_path, "r") as vf:
if not vf.is_vcf:
raise ValueError(f"{vcf_path} does not appear to be a VCF file.")
if vf.compression in HTS_COMPRESSED_VALS:
file_opener = _text_from_bgzf
with file_opener(vcf_path, "r") as vcf:
reader = csv.reader(vcf, delimiter="\t")
for row in reader:
if row and len(row) >= 1 and row[0].startswith("#"):
continue
yield row
[docs]
def vcf_to_vep_query_data(
vcf_path: pathlib.Path, chunk_size=VEP_MAX_CHUNK
) -> typing.Iterator[list[str]]:
"""Gets lists of data in chunks from the VCF file for querying the Ensembl API.
This is tailored to provide input to the
`POST vep/:species/region <https://rest.ensembl.org/documentation/info/vep_region_post>`_
endpoint.
:param vcf_path: Path to the VCF file.
:param chunk_size: Maximum number of variants to include in each chunk.
"""
chunk = []
for row in vcf_rows(vcf_path):
for col in VCF_MASK_COLS:
row[col] = "."
chunk.append(" ".join(row[:8]))
if len(chunk) == chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
[docs]
def query_vep_api(
client: httpx.Client,
chunk: list[str],
assembly: Assembly = Assembly.GRCH37,
retries=3,
params: dict | None = VEP_DEFAULT_PARAMS,
response_extractor: typing.Callable[[dict], dict] | None = None,
):
"""Queries the Ensembl Variant Effect Predictor (VEP) API.
The API endpoint is `POST vep/:species/region
<https://rest.ensembl.org/documentation/info/vep_region_post>`_. The API has
several limits on this endpoint, including a maximum of 200 variants per
request, and may return a 429 status code if the rate limit is exceeded. This
function can retry the request multiple times, using the Retry-After header if
it's present to delay the next request.
This function uses the synchronous client from the
`httpx <https://www.python-httpx.org/>`_ library, which is not as powerful as
the asynchronous client. A future version of this function may use the
asynchronous client to improve performance.
:param chunk: List of (up to 200) strings from the VCF file. See
:func:`get_vcf_query_data` for the format of these strings.
:param assembly: Assembly to use for the API query, by default GRCh37.
:param retries: Number of times to retry the API call if it returns a 429 status
code.
:param params: Additional parameters to pass to the API. By default, this
includes the `species=human` `variant_class=true`, and `pick=true`
parameters. `pick=true` is helpful for simplifying the consequences
of the variants and make it easier to assign a single annotation derived
from the consequences to the variant.
:param response_extractor: Optional function to transform data from the API
response. Without this, there is probably more fields in the response
with fields without the desired names. Additionally, the default dict
items returned have additional structure (sublist, subdicts) that is
preferably flattened by this function.
"""
subdomain = "grch37.rest" if assembly == Assembly.GRCH37 else "rest"
url = f"https://{subdomain}.ensembl.org/vep/homo_sapiens/region"
headers = {"Content-Type": "application/json", "Accept": "application/json"}
data = json.dumps({"variants": chunk})
retried = 0
done = False
while not done and retried <= retries:
response = client.post(url, headers=headers, data=data, params=params)
if response.status_code == 200:
items = response.json()
done = True
# optionally transform data
if not response_extractor:
response_extractor = lambda x: x # null operation
yield from map(response_extractor, items)
elif response.status_code == 429:
retried += 1
retry_after = response.headers.get("Retry-After")
if retry_after:
sleep_time = int(retry_after)
logger.warn(
f"API code 429 with Retry-After. Retrying after {sleep_time} seconds."
)
time.sleep(sleep_time)
else:
logger.warn(
f"API code 429 with no Retry-After. Retrying after {API_LONG_RETRY} seconds."
)
time.sleep(sleep_time)
else:
response.raise_for_status()
if not done:
raise TimeoutError("too many retries")
[docs]
def import_vep_data(
json_path: pathlib.Path,
json_extractor: typing.Callable[[dict], dict] | None = None,
):
"""Imports VEP data from a JSON file where VEP was run locallly.
At the record level, the format of the JSON file is the same as the API
response, where each line is a record dictionary.
:param json_path: Path to the JSON file.
:param json_extractor: Optional function to transform data from the API
response. Without this, there is probably more fields in the response
with fields without the desired names. Additionally, the default dict
items returned have additional structure (sublist, subdicts) that is
preferably flattened by this function.
"""
file_opener = (
functools.partial(pysam.BGZFile, mode="r")
if json_path.suffix == ".gz"
else functools.partial(open, mode="rb")
)
if not json_extractor:
json_extractor = lambda x: x
with file_opener(json_path) as json_f:
for line in json_f:
line = line.decode("utf-8").strip().replace("\t", "\\t")
if line == "":
continue
record = json.loads(line)
if json_extractor:
record = json_extractor(record)
yield record