"""
Map GWAS
A set of functions to find GWAS files in subfolder and
to map columns
"""
import os
import sys
import pandas as pd
import numpy as np
import gzip
import re
[docs]
def walkfs(startdir, findfile):
"""
Go through the folder and subfolder to find the specified file
Args:
startdir (str): path of the folder to explore
findfile (str): name of the file to find
"""
dircount = 0
filecount = 0
for root, dirs, files in os.walk(startdir):
if findfile in files:
return dircount, filecount + files.index(findfile), os.path.join(root, findfile)
dircount += 1
filecount += len(files)
# nothing found, return None instead of a full path for the file
return dircount, filecount, None
[docs]
def gwas_internal_link(GWAS_table, GWAS_path):
"""
Walk the GWAS path to find the GWAS tables
Args:
GWAS_table (str): path of the folder to explore
findfile (str): name of the file to find
Return:
a pandas dataframe with one column for the filename
and one column containing the complete path to the file
"""
Glink = []
for GWAS in range(0, len(GWAS_table)):
GWAS_filename = GWAS_table[GWAS]
Glink.append({'filename': GWAS_filename,
'internalDataLink': walkfs(GWAS_path, GWAS_filename)[2]})
Glink = pd.DataFrame(Glink, columns=('filename', 'internalDataLink'))
return Glink
[docs]
def convert_missing_values(df):
"""
Convert all missing value strings to a standart np.nan value
Args:
GWAS_table (pandas dataframe): GWAS data as a dataframe
Return:
a pandas dataframe with missing value all equal to np.nan
"""
def_missing = ['', '#N/A', '#N/A', 'N/A', '#NA', '-1.#IND', '-1.#QNAN',
'-NaN',
'-nan', '1.#IND', '1.#QNAN', 'N/A', 'NA', 'NULL', 'NaN',
'nan', 'na', '.']
nmissing = len(def_missing)
nan_vec = ["NA"] * nmissing
return df.replace(def_missing, nan_vec)
[docs]
def map_columns_position(gwas_internal_link, column_dict):
"""
Find column position for each specific Gwas
Args:
gwas_internal_link (str): filename of the GWAS data (with path)
GWAS_labels (pd.DataFrame): corresponding row of the information file
Return:
pandas Series with column position and column names as index
"""
gwas_file = gwas_internal_link.split('/')[-1]
#Our standart labels:
reference_label = column_dict.index.tolist()
# labels in the GWAS files
target_lab = pd.Index(column_dict.values.tolist())
is_gzipped = re.search(r".gz$", gwas_internal_link)
if is_gzipped:
f = gzip.open(gwas_internal_link)
line = f.readline()
line = line.decode('utf-8')
else:
f = open(gwas_internal_link)
line = f.readline()
count_line = 0
header = pd.Index(line.split())
def get_position(I,x):
try:
position_in_header = I.get_loc(x)
if isinstance(position_in_header, int):
return position_in_header
else:
raise IndexError("{0} is a not corresponding to an unique column in {1}. Check that column names are unique in the header of {1} Summary Statistics".format(x, gwas_file))
except KeyError:
return np.nan
label_position = [get_position(header,i) for i in target_lab]
mapgw = pd.Series(label_position, index=reference_label)
mapgw = mapgw.loc[~mapgw.isna()].astype(int)
mapgw.sort_values(inplace=True)
f.close()
return mapgw
[docs]
def read_gwas( gwas_internal_link, column_map, imputation_treshold=None):
"""
Read gwas raw data, fetch columns thanks to position stored in
column_map and rename columns according to column_map.index
Args:
gwas_internal_link (str): GWAS data as a dataframe
column_map (pandas Series): Series containing the position of column in
the raw data
Return:
a pandas dataframe with missing value all equal to np.nan
"""
print("Reading file:")
print(gwas_internal_link)
is_gzipped = re.search(r".gz|.bgz$", gwas_internal_link)
if is_gzipped:
compression = 'gzip'
else:
compression = None
print(column_map.values)
print(column_map.index)
fullGWAS = pd.read_csv(gwas_internal_link, delim_whitespace=True,
usecols = column_map.values,
compression=compression,
names= column_map.index,
header=0, na_values= ['', '#N/A', '#N/A', 'N/A','#NA', '-1.#IND', '-1.#QNAN',
'-NaN', '-nan', '1.#IND', '1.#QNAN', 'N/A',
'NA', 'NULL', 'NaN',
'nan', 'na', '.', '-'],
dtype={"snpid":str, "a1":str,"a2":str,"freq":np.double, "z":np.double,"se":np.double, "pval":np.double})
print(fullGWAS.head())
#Ensure that allele are written in upper cases:
fullGWAS.a1 = fullGWAS.a1.str.upper()
fullGWAS.a2 = fullGWAS.a2.str.upper()
def sorted_alleles(x):
return "".join(sorted(x))
# either rs ID or full position must be available:
if not(("CHR" in fullGWAS.columns) and ("POS" in fullGWAS.columns)) and ("snpid" not in fullGWAS.columns):
raise ValueError("Summary statistic file {0} doesn't contain rsID or full SNP position".format(gwas_internal_link))
if ("CHR" in fullGWAS.columns) and ("POS" in fullGWAS.columns):
fullGWAS["positional_index"] = fullGWAS.CHR.apply(str)+fullGWAS.POS.apply(str)+(fullGWAS.a1+fullGWAS.a2).apply(sorted_alleles)
if ("snpid" not in fullGWAS.columns):
fullGWAS["snpid"] = fullGWAS["positional_index"]
fullGWAS.set_index("snpid", inplace=True)
print(fullGWAS.head())
if imputation_treshold:
fullGWAS = fullGWAS.loc[fullGWAS.imputation_quality > imputation_treshold]
fullGWAS = fullGWAS[~fullGWAS.index.duplicated(keep='first')]
#fullGWAS = convert_missing_values(fullGWAS)
return fullGWAS