HerrewebPy/herrewebpy/bioinformatics/sequence_alignment.py

292 lines
14 KiB
Python

from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from herrewebpy import logger
import tqdm
"""
Multiple sequence alignment script using Needleman-Wunsch. Writes to fasta and stockholm, as well as to a plain text file.
Method derived from RP1 and RP2 internships. Used as a data validation technique in Alstom.
Author: Jonathan Herrewijnen
Email: jonathan.herrewijnen@gmail.com
Date: 25 November
Herreweb
"""
def write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2, score, output_filename):
"""
Write a multiple sequence alignment with associated metadata in Stockholm format to a file.
Parameters:
- aligned_identifiers1 (list): List of identifiers for the first aligned sequence.
- aligned_identifiers2 (list): List of identifiers for the second aligned sequence.
- aligned_metadata1 (list): List of metadata annotations for the first aligned sequence.
- aligned_metadata2 (list): List of metadata annotations for the second aligned sequence.
- score (float): Alignment score to be included as a global feature.
- output_filename (str): Name of the file to write the Stockholm-formatted alignment.
The function opens the specified file in write mode, writes the Stockholm header,
and iterates over aligned sequences and their associated metadata, writing them to the file.
The alignment score is also included as a global feature. The file is closed automatically
upon exiting the function.
Example:
>>> aligned_identifiers1 = ['A', 'B', 'C']
>>> aligned_identifiers2 = ['X', 'Y', 'Z']
>>> aligned_metadata1 = ['metaA', 'metaB', 'metaC']
>>> aligned_metadata2 = ['metaX', 'metaY', 'metaZ']
>>> score = 42.0
>>> write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2,
... aligned_metadata1, aligned_metadata2, score, 'output.sto')
"""
with open(output_filename, 'w') as stockholm_file:
stockholm_file.write("# STOCKHOLM 1.0\n")
for id1, id2, metadata1, metadata2 in zip(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2):
stockholm_file.write(f"{id1}\n")
stockholm_file.write(f"{id2}\n")
stockholm_file.write(f"#=GC METADATA1 {metadata1}\n")
stockholm_file.write(f"#=GC METADATA2 {metadata2}\n")
stockholm_file.write(f"#=GF SCORE: {score}\n")
stockholm_file.close()
def write_clustal_alignment(sequences, output_filename):
"""
Write to clustal format
"""
with open(output_filename, 'w') as clustal_file:
for sequence in sequences:
clustal_file.write(f"{sequence.id.ljust(20)} {sequence.seq}\n")
def write_text_format(aligned_identifiers1, aligned_identifiers2, score, output_filename, aligned_metadata1=None,
aligned_metadata2=None):
"""
Write aligned identifiers in two seperate text files (for visual comparison)
"""
with open(f'1-{output_filename}', 'w') as file: [file.write(f"Score: {score}\n")]
with open(f'2-{output_filename}', 'w') as file: [file.write(f"Score: {score}\n")]
if aligned_metadata1 is not None and aligned_metadata2 is not None:
with open(f'1-{output_filename}', 'w') as file: [file.write(f"{id1} {metadata1}\n") for id1, metadata1 in zip(aligned_identifiers1, aligned_metadata1)]
with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2} {metadata2}\n") for id2, metadata2 in zip(aligned_identifiers2, aligned_metadata2)]
else:
with open(f'1-{output_filename}', 'w') as file: [file.write(f"{id1}\n") for id1 in aligned_identifiers1]
with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2}\n") for id2 in aligned_identifiers2]
class SequenceAlignment:
def __init__(self, sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-2,
match_score=1, mismatch_penalty=-1, filename="alignment", threads=None,
stockholm=True, fasta=True, clustal=False, padding='-'):
"""
Perform global sequence alignment and save the results in various formats.
Parameters:
sequence1 (str): The first sequence to align.
sequence2 (str): The second sequence to align.
metadata1 (str, optional): Metadata for the first sequence. Default is None.
metadata2 (str, optional): Metadata for the second sequence. Default is None.
gap_penalty (int, optional): Penalty for introducing a gap. Default is -1.
match_score (int, optional): Score for a match. Default is 1.
mismatch_penalty (int, optional): Penalty for a mismatch. Default is -10.
filename (str, optional): Name for the output files. Default is "alignment".
threads (int, optional): Number of threads for parallel execution. Default is None.
stockholm (bool, optional): Whether to output in Stockholm format. Default is True.
fasta (bool, optional): Whether to output in FASTA format. Default is True.
clustal (bool, optional): Whether to output in Clustal format. Default is False.
padding (str, optional): Padding character for alignment. Default is '-'.
Returns:
int: The alignment score.
"""
self.sequence1 = sequence1
self.sequence2 = sequence2
self.metadata1 = metadata1
self.metadata2 = metadata2
self.gap_penalty = gap_penalty
self.match_score = match_score
self.mismatch_penalty = mismatch_penalty
self.filename = filename
self.threads = threads
self.stockholm = stockholm
self.fasta = fasta
self.clustal = clustal
self.padding = padding
self.align()
def align(self):
padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score = self._global_alignment_np(
self.sequence1, self.sequence2, self.metadata1, self.metadata2,
self.gap_penalty, self.match_score, self.mismatch_penalty, self.threads, self.padding
)
if self.metadata1 is not None and self.metadata2 is not None:
if self.stockholm:
write_stockholm_alignment_with_metadata(
padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score, f'{self.filename}.sto'
)
if self.fasta:
write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt',
aligned_metadata1, aligned_metadata2)
else:
write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt')
record1 = SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1")
record2 = SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")
if self.fasta:
SeqIO.write([record1, record2], f'{self.filename}.fasta', "fasta")
if self.clustal:
sequences = [SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1"),
SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")]
write_clustal_alignment(sequences, f'{self.filename}.aln')
self.score = score
def _parse_sequence(self, sequence):
"""
Parse a sequence, either as a pandas DataFrame or a string, and return the result.
Parameters:
sequence (pd.DataFrame or str): The input sequence data to be parsed.
"""
if sequence is None:
return None
elif isinstance(sequence, pd.Series):
logger.info("Parsing data as dataframe, so converting")
sequence = sequence.str.cat(sep='|')
elif isinstance(sequence, list):
sequence = '|'.join(sequence)
logger.info(f'Assuming type is string, returning list')
identifiers = [item.strip('|') for item in sequence.split('|') if item.strip('|')]
return identifiers
def _pad_sequences(self, aligned_identifiers1, aligned_identifiers2, padding):
"""
Add paddings (-) to sequences. Drastically helps visualize alignments.
"""
padded_sequences1, padded_sequences2 = [], []
for seq1, seq2 in zip(aligned_identifiers1, aligned_identifiers2):
if seq1 == '':
padded_seq1 = f'{padding}' * len(seq2)
padded_sequences1.append(padded_seq1)
padded_sequences2.append(seq2)
elif seq2 == '':
padded_seq2 = f'{padding}' * len(seq1)
padded_sequences1.append(seq1)
padded_sequences2.append(padded_seq2)
else:
if len(seq1) < len(seq2):
padded_seq1 = seq1 + f'{padding}' * (len(seq2) - len(seq1))
padded_sequences1.append(padded_seq1)
padded_sequences2.append(seq2)
else:
padded_seq2 = seq2 + f'{padding}' * (len(seq1) - len(seq2))
padded_sequences1.append(seq1)
padded_sequences2.append(padded_seq2)
return padded_sequences1, padded_sequences2
def _global_alignment_np(self, sequence1, sequence2, metadata1, metadata2, gap_penalty,
match_score, mismatch_penalty, threads, padding):
"""
Description:
This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`,
using the Needleman-Wunsch algorithm. It aligns the sequences based on the specified scoring parameters
for gap penalties, match scores, and mismatch penalties.
The function returns a tuple containing the following elements:
- The aligned longer sequence (list of strings) where gaps are indicated by '-' characters.
- The aligned shorter sequence (list of strings) where gaps are indicated by '-' characters.
- Aligned metadata for sequence1 (list of strings).
- Aligned metadata for sequence2 (list of strings).
- The alignment score (int).
Note:
If additional metadata is not provided (metadata1 or metadata2 is None), the corresponding aligned_metadata
lists will also be None.
Example:
```python
sequence1 = "AGCT"
sequence2 = "AAGCT"
aligned_seq1, aligned_seq2, align_metadata1, align_metadata2, score = _global_alignment_np(
sequence1, sequence2, metadata1="ABC", metadata2="XYZ"
)
```
"""
identifiers1, metadata1 = self._parse_sequence(sequence1), self._parse_sequence(metadata1)
identifiers2, metadata2 = self._parse_sequence(sequence2), self._parse_sequence(metadata2)
m, n = len(identifiers1), len(identifiers2)
dp_matrix = np.zeros((m + 1, n + 1))
progress_bar = tqdm.tqdm(total=(m + 1) * (n + 1))
with ThreadPoolExecutor(threads) as executor:
for i in range(1, m + 1):
for j in range(1, n + 1):
if identifiers1[i - 1] == identifiers2[j - 1]:
dp_matrix[i][j] = dp_matrix[i - 1][j - 1] + match_score
else:
dp_matrix[i][j] = max(dp_matrix[i - 1][j] + gap_penalty,
dp_matrix[i][j - 1] + gap_penalty,
dp_matrix[i - 1][j - 1] + mismatch_penalty)
progress_bar.update(1)
progress_bar.close()
aligned_identifiers1, aligned_metadata1 = [], []
aligned_identifiers2, aligned_metadata2 = [], []
score = dp_matrix[m][n]
logger.info(f'Calculated score is: {score}. (negative is generally bad, positive is good)')
match_count, mismatch_count, gap_count = 0, 0, 0
# Finds matches/mismatches and introduces gaps if mismatches are found
while m > 0 and n > 0:
if identifiers1[m - 1] == identifiers2[n - 1]:
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
m -= 1
n -= 1
match_count += 1
elif dp_matrix[m][n] == dp_matrix[m - 1][n - 1] + mismatch_penalty:
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
m -= 1
n -= 1
mismatch_count += 1
elif dp_matrix[m][n] == dp_matrix[m - 1][n] + gap_penalty:
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append('')
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]),aligned_metadata2.append(metadata2[n - 1])
m -= 1
gap_count += 1
else:
aligned_identifiers1.append(''), aligned_identifiers2.append(identifiers2[n - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
n -= 1
logger.info(f'{mismatch_count} mismatches found and {gap_count} gaps found. {match_count} matches found.')
aligned_identifiers1.reverse(), aligned_identifiers2.reverse()
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.reverse(), aligned_metadata2.reverse()
padded_sequences1, padded_sequences2 =self._pad_sequences(aligned_identifiers1, aligned_identifiers2, padding)
return padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score