"""
Module of function
"""
import pandas as pd
import numpy as np
import jass_preprocessing.dna_utils as dna_u
import warnings
[docs]
def read_reference(gwas_reference_panel, mask_MHC=False, minimum_MAF=None, region_to_mask=None):
"""
helper function to name correctly the column
Args:
gwas_reference_panel (str): path toward the reference panel file
mask_MHC (bool): Whether the MHC region should be masked or not. default is False
Filter the reference panel by minimum allele frequency (hg19 coordinate)
minimum_MAF (float): minimum allele frequency for a SNPs to be retain in the panel
region_to_mask (dict): a list of additional regions to mask
type_of_index(str): 'rs-number' or 'positional'
Return:
ref (pandas dataframe): the reference_panel with the specified filter applied
"""
ref = pd.read_csv(gwas_reference_panel, header=None, sep= "\t",
names =[ 'chr', "snp_id", "MAF","pos", "ref", "alt"],
dtype = {"chr": str, "snp_id":str, "MAF": np.double, "pos":np.double, "ref":str, "alt":str},
index_col="snp_id")
ref = ref.loc[~ref.pos.isnull()]
ref.pos = ref.pos.astype('int')
def sorted_alleles(x):
return "".join(sorted(x))
#Filter Strand ambiguous if biallelic
ref = ref.loc[~(ref.ref+ref.alt).isin(["AT", "TA", 'CG','GC'])]
print("REFERENCE")
print(ref.head())
ref["positional_index"] = ref.chr.apply(str)+ref.pos.apply(str)+(ref.ref+ref.alt).apply(sorted_alleles)
if mask_MHC:
ref = ref.loc[(ref.chr !=6)|(ref.pos < 28477797)|(ref.pos > 33448354)]
if minimum_MAF is not None:
ref = ref.loc[ref.MAF > minimum_MAF]
if region_to_mask is not None:
for reg in region_to_mask:
ref = ref.loc[(ref.chr != reg["chr"]) | (ref.pos <reg["start"])|(ref.pos > reg["end"])]
return(ref)
[docs]
def map_on_ref_panel(gw_df , ref_panel, index_type="rs-number"):
"""
Merge Gwas dataframe with the reference panel
Make sure that the same SNPs are in the reference panel and the gwas
Args:
gw_df (pandas dataframe): GWAS study dataframe
ref_panel (pandas dataframe): reference panel dataframe
Return:
merge_GWAS (pandas dataframe): merge studies,
"""
if index_type=="rs-number":
merge_GWAS = pd.merge(ref_panel, gw_df,
how='inner', indicator=True, left_index=True, right_index=True)
print("SNPs {}".format(merge_GWAS.shape[0]))
# If the GWAS file contains indexes of the kind chr*:position, perform a
# second join
if gw_df.index.map(str).str.contains("^chr*", case=False).any():
ref_panel['key2'] = "chr"+ref_panel.chr.map(str) + ":" +ref_panel.pos.map(str)
other_snp = pd.merge(ref_panel, gw_df, how='inner', indicator=True,
left_on ='key2', right_index=True)
merge_GWAS = pd.concat([other_snp, merge_GWAS])
else:
if index_type=="positional":
print("length of intersection")
print(len(pd.Index(ref_panel.positional_index).intersection(pd.Index(gw_df.positional_index))))
merge_GWAS = pd.merge(ref_panel.reset_index(), gw_df,
how='inner', indicator=True, on="positional_index")
print(merge_GWAS)
merge_GWAS.set_index("snp_id", inplace=True)
else:
raise ValueError("index_type can take only two values: 'rs-number' or 'positional'")
if (("pos" in merge_GWAS.columns) and ("POS" in merge_GWAS.columns)):
if ((merge_GWAS.pos == merge_GWAS.POS).mean()> 0.95):
merge_GWAS = merge_GWAS.loc[(merge_GWAS.pos == merge_GWAS.POS)]
else:
raise ValueError("SNP positions in reference panel and in Summary statistic are different! Different assembly?")
print("before filter")
print(merge_GWAS.shape)
merge_GWAS = merge_GWAS[~merge_GWAS.index.duplicated(keep='first')]
print("after filter")
print(merge_GWAS.shape)
merge_GWAS.index.rename("snp_id", inplace=True)
return(merge_GWAS)
[docs]
def compute_is_flipped(mgwas):
"""
Check if the reference panel and the GWAS data have the same reference
allele. return a boolean vector.
Args:
mgwas (pandas dataframe): GWAS study dataframe merged with the reference_panel
Return:
is_flipped (pandas dataframe): merge studies,
"""
flipped = pd.DataFrame({"ref_flipped" : (mgwas.ref == mgwas.a2), "alt_flipped" : (mgwas.alt == mgwas.a1)})
flipped_complement = pd.DataFrame({"ref_flippedc" : (mgwas.ref == mgwas.a2c), "alt_flippedc" : (mgwas.alt == mgwas.a1c)})
is_flipped = pd.DataFrame({"flipped":flipped.all(axis=1), # The allele of the
"flipped_complement":flipped_complement.all(axis=1)}
).any(axis=1)
return is_flipped
[docs]
def compute_is_aligned(mgwas):
"""
Check if the reference panel and the GWAS data have the same reference
allele. return a boolean vector.
The function should be the complement of "is_flipped" but we still compute
the two function to eventually detect weird cases (more than two alleles for
instance)
"""
aligned = pd.DataFrame({"ref_ok" : (mgwas.ref == mgwas.a1), "alt_ok" : (mgwas.alt == mgwas.a2)})
aligned_complement = pd.DataFrame({"ref_ok" : (mgwas.ref == mgwas.a1c), "alt_ok" : (mgwas.alt == mgwas.a2c)})
is_aligned = pd.DataFrame({"aligned":aligned.all(axis=1), # The allele of the
"aligned_complement":aligned_complement.all(axis=1)}).any(axis=1)
return is_aligned
[docs]
def compute_snp_alignement(mgwas):
"""
Add a column to mgwas indicating if the reference and coded
allele is flipped compared to the reference panel.
If it is, the sign of the statistic must be flipped
Args:
mgwas: a pandas dataframe of the GWAS data merged
with the reference panel
"""
#ensure that allele are upper cases:
mgwas['a1'] = mgwas.a1.str.upper()
mgwas['a2'] = mgwas.a2.str.upper()
mgwas['a1c'] = dna_u.dna_complement(mgwas.a1)
mgwas['a2c'] = dna_u.dna_complement(mgwas.a2)
print(mgwas)
is_aligned = compute_is_aligned(mgwas)
is_flipped = compute_is_flipped(mgwas)
# does some SNP are not "alignable" on reference
not_aligned_at_all = ~pd.DataFrame([is_aligned, is_flipped ]).any() #
if(not_aligned_at_all.any()):
warnings.warn('Some snps are not aligned at all with the reference panel\nand will be filtered:\n{}'.format(mgwas.index[is_flipped]),
DeprecationWarning)
mgwas = mgwas.loc[~not_aligned_at_all]
aligned_everywhere = pd.DataFrame([is_aligned, is_flipped ]).all()
if(aligned_everywhere.any()):
raise ValueError('Some snps are both aligned and flipped:\n{}'.format(mgwas.index[is_flipped]),
DeprecationWarning)
mgwas['sign_flip'] = np.nan
mgwas.loc[is_aligned,"sign_flip"] = 1
mgwas.loc[is_flipped, "sign_flip"] = -1
return(mgwas)