292 lines
14 KiB
Python
Executable File
292 lines
14 KiB
Python
Executable File
from Bio import SeqIO
|
|
from Bio.SeqRecord import SeqRecord
|
|
from Bio.Seq import Seq
|
|
import pandas as pd
|
|
import numpy as np
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from herrewebpy import logger
|
|
import tqdm
|
|
|
|
"""
|
|
Multiple sequence alignment script using Needleman-Wunsch. Writes to fasta and stockholm, as well as to a plain text file.
|
|
|
|
Method derived from RP1 and RP2 internships. Used as a data validation technique in Alstom.
|
|
|
|
Author: Jonathan Herrewijnen
|
|
Email: jonathan.herrewijnen@gmail.com
|
|
Date: 25 November
|
|
Herreweb
|
|
"""
|
|
|
|
def write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2, score, output_filename):
|
|
"""
|
|
Write a multiple sequence alignment with associated metadata in Stockholm format to a file.
|
|
|
|
Parameters:
|
|
- aligned_identifiers1 (list): List of identifiers for the first aligned sequence.
|
|
- aligned_identifiers2 (list): List of identifiers for the second aligned sequence.
|
|
- aligned_metadata1 (list): List of metadata annotations for the first aligned sequence.
|
|
- aligned_metadata2 (list): List of metadata annotations for the second aligned sequence.
|
|
- score (float): Alignment score to be included as a global feature.
|
|
- output_filename (str): Name of the file to write the Stockholm-formatted alignment.
|
|
|
|
The function opens the specified file in write mode, writes the Stockholm header,
|
|
and iterates over aligned sequences and their associated metadata, writing them to the file.
|
|
The alignment score is also included as a global feature. The file is closed automatically
|
|
upon exiting the function.
|
|
|
|
Example:
|
|
>>> aligned_identifiers1 = ['A', 'B', 'C']
|
|
>>> aligned_identifiers2 = ['X', 'Y', 'Z']
|
|
>>> aligned_metadata1 = ['metaA', 'metaB', 'metaC']
|
|
>>> aligned_metadata2 = ['metaX', 'metaY', 'metaZ']
|
|
>>> score = 42.0
|
|
>>> write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2,
|
|
... aligned_metadata1, aligned_metadata2, score, 'output.sto')
|
|
"""
|
|
with open(output_filename, 'w') as stockholm_file:
|
|
stockholm_file.write("# STOCKHOLM 1.0\n")
|
|
for id1, id2, metadata1, metadata2 in zip(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2):
|
|
stockholm_file.write(f"{id1}\n")
|
|
stockholm_file.write(f"{id2}\n")
|
|
stockholm_file.write(f"#=GC METADATA1 {metadata1}\n")
|
|
stockholm_file.write(f"#=GC METADATA2 {metadata2}\n")
|
|
stockholm_file.write(f"#=GF SCORE: {score}\n")
|
|
stockholm_file.close()
|
|
|
|
|
|
def write_clustal_alignment(sequences, output_filename):
|
|
"""
|
|
Write to clustal format
|
|
"""
|
|
with open(output_filename, 'w') as clustal_file:
|
|
for sequence in sequences:
|
|
clustal_file.write(f"{sequence.id.ljust(20)} {sequence.seq}\n")
|
|
|
|
|
|
def write_text_format(aligned_identifiers1, aligned_identifiers2, score, output_filename, aligned_metadata1=None,
|
|
aligned_metadata2=None):
|
|
"""
|
|
Write aligned identifiers in two seperate text files (for visual comparison)
|
|
"""
|
|
with open(f'1-{output_filename}', 'w') as file: [file.write(f"Score: {score}\n")]
|
|
with open(f'2-{output_filename}', 'w') as file: [file.write(f"Score: {score}\n")]
|
|
|
|
if aligned_metadata1 is not None and aligned_metadata2 is not None:
|
|
with open(f'1-{output_filename}', 'w') as file: [file.write(f"{id1} {metadata1}\n") for id1, metadata1 in zip(aligned_identifiers1, aligned_metadata1)]
|
|
with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2} {metadata2}\n") for id2, metadata2 in zip(aligned_identifiers2, aligned_metadata2)]
|
|
else:
|
|
with open(f'1-{output_filename}', 'w') as file: [file.write(f"{id1}\n") for id1 in aligned_identifiers1]
|
|
with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2}\n") for id2 in aligned_identifiers2]
|
|
|
|
|
|
class SequenceAlignment:
|
|
def __init__(self, sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-2,
|
|
match_score=1, mismatch_penalty=-1, filename="alignment", threads=None,
|
|
stockholm=True, fasta=True, clustal=False, padding='-'):
|
|
"""
|
|
Perform global sequence alignment and save the results in various formats.
|
|
|
|
Parameters:
|
|
sequence1 (str): The first sequence to align.
|
|
sequence2 (str): The second sequence to align.
|
|
metadata1 (str, optional): Metadata for the first sequence. Default is None.
|
|
metadata2 (str, optional): Metadata for the second sequence. Default is None.
|
|
gap_penalty (int, optional): Penalty for introducing a gap. Default is -1.
|
|
match_score (int, optional): Score for a match. Default is 1.
|
|
mismatch_penalty (int, optional): Penalty for a mismatch. Default is -10.
|
|
filename (str, optional): Name for the output files. Default is "alignment".
|
|
threads (int, optional): Number of threads for parallel execution. Default is None.
|
|
stockholm (bool, optional): Whether to output in Stockholm format. Default is True.
|
|
fasta (bool, optional): Whether to output in FASTA format. Default is True.
|
|
clustal (bool, optional): Whether to output in Clustal format. Default is False.
|
|
padding (str, optional): Padding character for alignment. Default is '-'.
|
|
|
|
Returns:
|
|
int: The alignment score.
|
|
"""
|
|
self.sequence1 = sequence1
|
|
self.sequence2 = sequence2
|
|
self.metadata1 = metadata1
|
|
self.metadata2 = metadata2
|
|
self.gap_penalty = gap_penalty
|
|
self.match_score = match_score
|
|
self.mismatch_penalty = mismatch_penalty
|
|
self.filename = filename
|
|
self.threads = threads
|
|
self.stockholm = stockholm
|
|
self.fasta = fasta
|
|
self.clustal = clustal
|
|
self.padding = padding
|
|
|
|
self.align()
|
|
|
|
|
|
def align(self):
|
|
padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score = self._global_alignment_np(
|
|
self.sequence1, self.sequence2, self.metadata1, self.metadata2,
|
|
self.gap_penalty, self.match_score, self.mismatch_penalty, self.threads, self.padding
|
|
)
|
|
|
|
if self.metadata1 is not None and self.metadata2 is not None:
|
|
if self.stockholm:
|
|
write_stockholm_alignment_with_metadata(
|
|
padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score, f'{self.filename}.sto'
|
|
)
|
|
if self.fasta:
|
|
write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt',
|
|
aligned_metadata1, aligned_metadata2)
|
|
else:
|
|
write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt')
|
|
|
|
record1 = SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1")
|
|
record2 = SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")
|
|
|
|
if self.fasta:
|
|
SeqIO.write([record1, record2], f'{self.filename}.fasta', "fasta")
|
|
|
|
if self.clustal:
|
|
sequences = [SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1"),
|
|
SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")]
|
|
write_clustal_alignment(sequences, f'{self.filename}.aln')
|
|
|
|
self.score = score
|
|
|
|
|
|
def _parse_sequence(self, sequence):
|
|
"""
|
|
Parse a sequence, either as a pandas DataFrame or a string, and return the result.
|
|
|
|
Parameters:
|
|
sequence (pd.DataFrame or str): The input sequence data to be parsed.
|
|
"""
|
|
if sequence is None:
|
|
return None
|
|
elif isinstance(sequence, pd.Series):
|
|
logger.info("Parsing data as dataframe, so converting")
|
|
sequence = sequence.str.cat(sep='|')
|
|
elif isinstance(sequence, list):
|
|
sequence = '|'.join(sequence)
|
|
logger.info(f'Assuming type is string, returning list')
|
|
identifiers = [item.strip('|') for item in sequence.split('|') if item.strip('|')]
|
|
return identifiers
|
|
|
|
|
|
def _pad_sequences(self, aligned_identifiers1, aligned_identifiers2, padding):
|
|
"""
|
|
Add paddings (-) to sequences. Drastically helps visualize alignments.
|
|
"""
|
|
padded_sequences1, padded_sequences2 = [], []
|
|
for seq1, seq2 in zip(aligned_identifiers1, aligned_identifiers2):
|
|
if seq1 == '':
|
|
padded_seq1 = f'{padding}' * len(seq2)
|
|
padded_sequences1.append(padded_seq1)
|
|
padded_sequences2.append(seq2)
|
|
elif seq2 == '':
|
|
padded_seq2 = f'{padding}' * len(seq1)
|
|
padded_sequences1.append(seq1)
|
|
padded_sequences2.append(padded_seq2)
|
|
else:
|
|
if len(seq1) < len(seq2):
|
|
padded_seq1 = seq1 + f'{padding}' * (len(seq2) - len(seq1))
|
|
padded_sequences1.append(padded_seq1)
|
|
padded_sequences2.append(seq2)
|
|
else:
|
|
padded_seq2 = seq2 + f'{padding}' * (len(seq1) - len(seq2))
|
|
padded_sequences1.append(seq1)
|
|
padded_sequences2.append(padded_seq2)
|
|
return padded_sequences1, padded_sequences2
|
|
|
|
|
|
def _global_alignment_np(self, sequence1, sequence2, metadata1, metadata2, gap_penalty,
|
|
match_score, mismatch_penalty, threads, padding):
|
|
"""
|
|
Description:
|
|
This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`,
|
|
using the Needleman-Wunsch algorithm. It aligns the sequences based on the specified scoring parameters
|
|
for gap penalties, match scores, and mismatch penalties.
|
|
|
|
The function returns a tuple containing the following elements:
|
|
- The aligned longer sequence (list of strings) where gaps are indicated by '-' characters.
|
|
- The aligned shorter sequence (list of strings) where gaps are indicated by '-' characters.
|
|
- Aligned metadata for sequence1 (list of strings).
|
|
- Aligned metadata for sequence2 (list of strings).
|
|
- The alignment score (int).
|
|
|
|
Note:
|
|
If additional metadata is not provided (metadata1 or metadata2 is None), the corresponding aligned_metadata
|
|
lists will also be None.
|
|
|
|
Example:
|
|
```python
|
|
sequence1 = "AGCT"
|
|
sequence2 = "AAGCT"
|
|
aligned_seq1, aligned_seq2, align_metadata1, align_metadata2, score = _global_alignment_np(
|
|
sequence1, sequence2, metadata1="ABC", metadata2="XYZ"
|
|
)
|
|
```
|
|
|
|
"""
|
|
identifiers1, metadata1 = self._parse_sequence(sequence1), self._parse_sequence(metadata1)
|
|
identifiers2, metadata2 = self._parse_sequence(sequence2), self._parse_sequence(metadata2)
|
|
m, n = len(identifiers1), len(identifiers2)
|
|
dp_matrix = np.zeros((m + 1, n + 1))
|
|
|
|
progress_bar = tqdm.tqdm(total=(m + 1) * (n + 1))
|
|
with ThreadPoolExecutor(threads) as executor:
|
|
for i in range(1, m + 1):
|
|
for j in range(1, n + 1):
|
|
if identifiers1[i - 1] == identifiers2[j - 1]:
|
|
dp_matrix[i][j] = dp_matrix[i - 1][j - 1] + match_score
|
|
else:
|
|
dp_matrix[i][j] = max(dp_matrix[i - 1][j] + gap_penalty,
|
|
dp_matrix[i][j - 1] + gap_penalty,
|
|
dp_matrix[i - 1][j - 1] + mismatch_penalty)
|
|
progress_bar.update(1)
|
|
progress_bar.close()
|
|
|
|
aligned_identifiers1, aligned_metadata1 = [], []
|
|
aligned_identifiers2, aligned_metadata2 = [], []
|
|
|
|
score = dp_matrix[m][n]
|
|
logger.info(f'Calculated score is: {score}. (negative is generally bad, positive is good)')
|
|
match_count, mismatch_count, gap_count = 0, 0, 0
|
|
|
|
# Finds matches/mismatches and introduces gaps if mismatches are found
|
|
while m > 0 and n > 0:
|
|
if identifiers1[m - 1] == identifiers2[n - 1]:
|
|
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1])
|
|
if metadata1 is not None and metadata2 is not None:
|
|
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
|
|
m -= 1
|
|
n -= 1
|
|
match_count += 1
|
|
elif dp_matrix[m][n] == dp_matrix[m - 1][n - 1] + mismatch_penalty:
|
|
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1])
|
|
if metadata1 is not None and metadata2 is not None:
|
|
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
|
|
m -= 1
|
|
n -= 1
|
|
mismatch_count += 1
|
|
elif dp_matrix[m][n] == dp_matrix[m - 1][n] + gap_penalty:
|
|
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append('')
|
|
if metadata1 is not None and metadata2 is not None:
|
|
aligned_metadata1.append(metadata1[m - 1]),aligned_metadata2.append(metadata2[n - 1])
|
|
m -= 1
|
|
gap_count += 1
|
|
else:
|
|
aligned_identifiers1.append(''), aligned_identifiers2.append(identifiers2[n - 1])
|
|
if metadata1 is not None and metadata2 is not None:
|
|
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
|
|
n -= 1
|
|
|
|
logger.info(f'{mismatch_count} mismatches found and {gap_count} gaps found. {match_count} matches found.')
|
|
|
|
aligned_identifiers1.reverse(), aligned_identifiers2.reverse()
|
|
if metadata1 is not None and metadata2 is not None:
|
|
aligned_metadata1.reverse(), aligned_metadata2.reverse()
|
|
|
|
padded_sequences1, padded_sequences2 =self._pad_sequences(aligned_identifiers1, aligned_identifiers2, padding)
|
|
|
|
return padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score
|