Updating sequence alignment

This commit is contained in:
Jonathan Herrewijnen 2023-12-02 13:35:13 +01:00
parent 154cee5826
commit ce2384cb72
3 changed files with 220 additions and 192 deletions

View File

@ -1,5 +1,2 @@
from herrewebpy.mlops import anomaly_scoring from herrewebpy.bioinformatics import sequence_alignment
import seaborn as sns sequence_alignment.SequenceAlignment(['aa', 'bb', 'cc'],['bb','aa','cc'], ['1','2','3'], ['1','2','3'])
df = sns.load_dataset('iris')
anomaly_scoring.train_model(df)

View File

@ -0,0 +1,28 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Perform a sequence alignment with metadata (align sequence number 2 on sequence number 1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from herrewebpy.bioinformatics import sequence_alignment\n",
"sequence_alignment.SequenceAlignment(['aa', 'bb', 'cc'],['bb','aa','cc'], ['1','2','3'], ['1','2','3'])"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -18,23 +18,6 @@ Date: 25 November
Herreweb Herreweb
""" """
def _parse_sequence(sequence):
"""
Parse a sequence, either as a pandas DataFrame or a string, and return the result.
Parameters:
sequence (pd.DataFrame or str): The input sequence data to be parsed.
"""
if sequence is None:
return None
elif isinstance(sequence, pd.Series):
logger.info("Parsing data as dataframe, so converting")
sequence = sequence.str.cat(sep='|')
logger.info(f'Assuming type is string, returning list')
identifiers = [item.strip('|') for item in sequence.split('|') if item.strip('|')]
return identifiers
def write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2, score, output_filename): def write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2, score, output_filename):
""" """
Write a multiple sequence alignment with associated metadata in Stockholm format to a file. Write a multiple sequence alignment with associated metadata in Stockholm format to a file.
@ -97,192 +80,212 @@ def write_text_format(aligned_identifiers1, aligned_identifiers2, score, output_
with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2}\n") for id2 in aligned_identifiers2] with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2}\n") for id2 in aligned_identifiers2]
def _pad_sequences(aligned_identifiers1, aligned_identifiers2, padding): class SequenceAlignment:
def __init__(self, sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-2,
match_score=1, mismatch_penalty=-1, filename="alignment", threads=None,
stockholm=True, fasta=True, clustal=False, padding='-'):
""" """
Add paddings (-) to sequences. Drastically helps visualize alignments. Perform global sequence alignment and save the results in various formats.
Parameters:
sequence1 (str): The first sequence to align.
sequence2 (str): The second sequence to align.
metadata1 (str, optional): Metadata for the first sequence. Default is None.
metadata2 (str, optional): Metadata for the second sequence. Default is None.
gap_penalty (int, optional): Penalty for introducing a gap. Default is -1.
match_score (int, optional): Score for a match. Default is 1.
mismatch_penalty (int, optional): Penalty for a mismatch. Default is -10.
filename (str, optional): Name for the output files. Default is "alignment".
threads (int, optional): Number of threads for parallel execution. Default is None.
stockholm (bool, optional): Whether to output in Stockholm format. Default is True.
fasta (bool, optional): Whether to output in FASTA format. Default is True.
clustal (bool, optional): Whether to output in Clustal format. Default is False.
padding (str, optional): Padding character for alignment. Default is '-'.
Returns:
int: The alignment score.
""" """
padded_sequences1, padded_sequences2 = [], [] self.sequence1 = sequence1
for seq1, seq2 in zip(aligned_identifiers1, aligned_identifiers2): self.sequence2 = sequence2
if seq1 == '': self.metadata1 = metadata1
padded_seq1 = f'{padding}' * len(seq2) self.metadata2 = metadata2
padded_sequences1.append(padded_seq1) self.gap_penalty = gap_penalty
padded_sequences2.append(seq2) self.match_score = match_score
elif seq2 == '': self.mismatch_penalty = mismatch_penalty
padded_seq2 = f'{padding}' * len(seq1) self.filename = filename
padded_sequences1.append(seq1) self.threads = threads
padded_sequences2.append(padded_seq2) self.stockholm = stockholm
else: self.fasta = fasta
if len(seq1) < len(seq2): self.clustal = clustal
padded_seq1 = seq1 + f'{padding}' * (len(seq2) - len(seq1)) self.padding = padding
self.align()
def align(self):
padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score = self._global_alignment_np(
self.sequence1, self.sequence2, self.metadata1, self.metadata2,
self.gap_penalty, self.match_score, self.mismatch_penalty, self.threads, self.padding
)
if self.metadata1 is not None and self.metadata2 is not None:
if self.stockholm:
write_stockholm_alignment_with_metadata(
padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score, f'{self.filename}.sto'
)
if self.fasta:
write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt',
aligned_metadata1, aligned_metadata2)
else:
write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt')
record1 = SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1")
record2 = SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")
if self.fasta:
SeqIO.write([record1, record2], f'{self.filename}.fasta', "fasta")
if self.clustal:
sequences = [SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1"),
SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")]
write_clustal_alignment(sequences, f'{self.filename}.aln')
self.score = score
def _parse_sequence(self, sequence):
"""
Parse a sequence, either as a pandas DataFrame or a string, and return the result.
Parameters:
sequence (pd.DataFrame or str): The input sequence data to be parsed.
"""
if sequence is None:
return None
elif isinstance(sequence, pd.Series):
logger.info("Parsing data as dataframe, so converting")
sequence = sequence.str.cat(sep='|')
elif isinstance(sequence, list):
sequence = '|'.join(sequence)
logger.info(f'Assuming type is string, returning list')
identifiers = [item.strip('|') for item in sequence.split('|') if item.strip('|')]
return identifiers
def _pad_sequences(self, aligned_identifiers1, aligned_identifiers2, padding):
"""
Add paddings (-) to sequences. Drastically helps visualize alignments.
"""
padded_sequences1, padded_sequences2 = [], []
for seq1, seq2 in zip(aligned_identifiers1, aligned_identifiers2):
if seq1 == '':
padded_seq1 = f'{padding}' * len(seq2)
padded_sequences1.append(padded_seq1) padded_sequences1.append(padded_seq1)
padded_sequences2.append(seq2) padded_sequences2.append(seq2)
else: elif seq2 == '':
padded_seq2 = seq2 + f'{padding}' * (len(seq1) - len(seq2)) padded_seq2 = f'{padding}' * len(seq1)
padded_sequences1.append(seq1) padded_sequences1.append(seq1)
padded_sequences2.append(padded_seq2) padded_sequences2.append(padded_seq2)
return padded_sequences1, padded_sequences2
def global_alignment_np(sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-1,
match_score=1, mismatch_penalty=-10, threads=None):
"""
Description:
This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`,
using the Needleman-Wunsch algorithm. It aligns the sequences based on the specified scoring parameters
for gap penalties, match scores, and mismatch penalties.
The function returns a tuple containing the following elements:
- The aligned longer sequence (list of strings) where gaps are indicated by '-' characters.
- The aligned shorter sequence (list of strings) where gaps are indicated by '-' characters.
- Aligned metadata for sequence1 (list of strings).
- Aligned metadata for sequence2 (list of strings).
- The alignment score (int).
Note:
If additional metadata is not provided (metadata1 or metadata2 is None), the corresponding aligned_metadata
lists will also be None.
Example:
```python
sequence1 = "AGCT"
sequence2 = "AAGCT"
aligned_seq1, aligned_seq2, align_metadata1, align_metadata2, score = global_alignment_np(
sequence1, sequence2, metadata1="ABC", metadata2="XYZ"
)
```
"""
identifiers1, metadata1 = _parse_sequence(sequence1), _parse_sequence(metadata1)
identifiers2, metadata2 = _parse_sequence(sequence2), _parse_sequence(metadata2)
m, n = len(identifiers1), len(identifiers2)
dp_matrix = np.zeros((m + 1, n + 1))
progress_bar = tqdm.tqdm(total=(m + 1) * (n + 1))
with ThreadPoolExecutor(threads) as executor:
for i in range(1, m + 1):
for j in range(1, n + 1):
if identifiers1[i - 1] == identifiers2[j - 1]:
dp_matrix[i][j] = dp_matrix[i - 1][j - 1] + match_score
else: else:
dp_matrix[i][j] = max(dp_matrix[i - 1][j] + gap_penalty, if len(seq1) < len(seq2):
dp_matrix[i][j - 1] + gap_penalty, padded_seq1 = seq1 + f'{padding}' * (len(seq2) - len(seq1))
dp_matrix[i - 1][j - 1] + mismatch_penalty) padded_sequences1.append(padded_seq1)
progress_bar.update(1) padded_sequences2.append(seq2)
progress_bar.close() else:
padded_seq2 = seq2 + f'{padding}' * (len(seq1) - len(seq2))
aligned_identifiers1, aligned_metadata1 = [], [] padded_sequences1.append(seq1)
aligned_identifiers2, aligned_metadata2 = [], [] padded_sequences2.append(padded_seq2)
return padded_sequences1, padded_sequences2
i, j = m, n
score = dp_matrix[m][n]
logger.info(f'Calculated score is: {score}')
while i > 0 and j > 0:
if identifiers1[i - 1] == identifiers2[j - 1]:
aligned_identifiers1.append(identifiers1[i - 1])
aligned_identifiers2.append(identifiers2[j - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[i - 1])
aligned_metadata2.append(metadata2[j - 1])
i -= 1
j -= 1
elif dp_matrix[i][j] == dp_matrix[i - 1][j - 1] + mismatch_penalty:
aligned_identifiers1.append(identifiers1[i - 1])
aligned_identifiers2.append(identifiers2[j - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[i - 1])
aligned_metadata2.append(metadata2[j - 1])
i -= 1
j -= 1
elif dp_matrix[i][j] == dp_matrix[i - 1][j] + gap_penalty:
aligned_identifiers1.append(identifiers1[i - 1])
aligned_identifiers2.append('')
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[i - 1])
aligned_metadata2.append(metadata2[j - 1])
i -= 1
else:
aligned_identifiers1.append('')
aligned_identifiers2.append(identifiers2[j - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[i - 1])
aligned_metadata2.append(metadata2[j - 1])
j -= 1
aligned_identifiers1.reverse()
aligned_identifiers2.reverse()
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.reverse()
aligned_metadata2.reverse()
padded_sequences1, padded_sequences2 =_pad_sequences(aligned_identifiers1, aligned_identifiers2, padding)
return padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score
def sequence_alignment(sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-1, def _global_alignment_np(self, sequence1, sequence2, metadata1, metadata2, gap_penalty,
match_score=1, mismatch_penalty=-10, filename="alignment", threads=None, match_score, mismatch_penalty, threads, padding):
stockholm=True, fasta=True, clustal=False, padding='-'): """
""" Description:
Perform global sequence alignment and save the results in various formats. This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`,
using the Needleman-Wunsch algorithm. It aligns the sequences based on the specified scoring parameters
for gap penalties, match scores, and mismatch penalties.
Parameters: The function returns a tuple containing the following elements:
sequence1 (str): The first sequence to align. - The aligned longer sequence (list of strings) where gaps are indicated by '-' characters.
sequence2 (str): The second sequence to align. - The aligned shorter sequence (list of strings) where gaps are indicated by '-' characters.
metadata1 (str, optional): Metadata for the first sequence. Default is None. - Aligned metadata for sequence1 (list of strings).
metadata2 (str, optional): Metadata for the second sequence. Default is None. - Aligned metadata for sequence2 (list of strings).
gap_penalty (int, optional): Penalty for introducing a gap. Default is -1. - The alignment score (int).
match_score (int, optional): Score for a match. Default is 1.
mismatch_penalty (int, optional): Penalty for a mismatch. Default is -10.
filename (str, optional): Name for the output files. Default is "alignment".
threads (int, optional): Number of threads for parallel execution. Default is None.
stockholm (bool, optional): Whether to output in Stockholm format. Default is True.
fasta (bool, optional): Whether to output in FASTA format. Default is True.
Returns: Note:
int: The alignment score. If additional metadata is not provided (metadata1 or metadata2 is None), the corresponding aligned_metadata
lists will also be None.
Description: Example:
This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`, ```python
using the Needleman-Wunsch algorithm. It automatically determines the optimal scoring parameters for gap penalties, sequence1 = "AGCT"
match scores, and mismatch penalties based on a sample alignment. sequence2 = "AAGCT"
aligned_seq1, aligned_seq2, align_metadata1, align_metadata2, score = _global_alignment_np(
sequence1, sequence2, metadata1="ABC", metadata2="XYZ"
)
```
The function saves the alignment in various formats based on the specified options: """
- Stockholm format if `stockholm` is True. identifiers1, metadata1 = self._parse_sequence(sequence1), self._parse_sequence(metadata1)
- FASTA format if `fasta` is True. identifiers2, metadata2 = self._parse_sequence(sequence2), self._parse_sequence(metadata2)
- A text file with the aligned sequences and metadata. m, n = len(identifiers1), len(identifiers2)
dp_matrix = np.zeros((m + 1, n + 1))
The alignment score is returned as an integer. progress_bar = tqdm.tqdm(total=(m + 1) * (n + 1))
with ThreadPoolExecutor(threads) as executor:
for i in range(1, m + 1):
for j in range(1, n + 1):
if identifiers1[i - 1] == identifiers2[j - 1]:
dp_matrix[i][j] = dp_matrix[i - 1][j - 1] + match_score
else:
dp_matrix[i][j] = max(dp_matrix[i - 1][j] + gap_penalty,
dp_matrix[i][j - 1] + gap_penalty,
dp_matrix[i - 1][j - 1] + mismatch_penalty)
progress_bar.update(1)
progress_bar.close()
Example: aligned_identifiers1, aligned_metadata1 = [], []
alignment_score = sequence_alignment("AGTACG", "ATGC", metadata1="abc", metadata2="def", gap_penalty=-2, aligned_identifiers2, aligned_metadata2 = [], []
match_score=2, mismatch_penalty=-1, filename="my_alignment",
threads=4, stockholm=True, fasta=True)
"""
padded_sequences1, padded_sequences2, aligned_metadata1, \
aligned_metadata2, score = global_alignment_np(sequence1, sequence2, metadata1, metadata2, gap_penalty,
match_score, mismatch_penalty, threads, padding)
if metadata1 is not None and metadata2 is not None: score = dp_matrix[m][n]
if stockholm is True: logger.info(f'Calculated score is: {score}. (negative is generally bad, positive is good)')
write_stockholm_alignment_with_metadata(padded_sequences1, padded_sequences2, aligned_metadata1, match_count, mismatch_count, gap_count = 0, 0, 0
aligned_metadata2, score, f'{filename}.sto')
if fasta is True:
write_text_format(padded_sequences1, padded_sequences2, score, f'{filename}-text.txt',
aligned_metadata1, aligned_metadata2)
else:
write_text_format(padded_sequences1, padded_sequences2, score, f'{filename}-text.txt')
record1 = SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1") # Finds matches/mismatches and introduces gaps if mismatches are found
record2 = SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2") while m > 0 and n > 0:
if identifiers1[m - 1] == identifiers2[n - 1]:
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
m -= 1
n -= 1
match_count += 1
elif dp_matrix[m][n] == dp_matrix[m - 1][n - 1] + mismatch_penalty:
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
m -= 1
n -= 1
mismatch_count += 1
elif dp_matrix[m][n] == dp_matrix[m - 1][n] + gap_penalty:
aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append('')
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]),aligned_metadata2.append(metadata2[n - 1])
m -= 1
gap_count += 1
else:
aligned_identifiers1.append(''), aligned_identifiers2.append(identifiers2[n - 1])
if metadata1 is not None and metadata2 is not None:
aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1])
n -= 1
if fasta is True: logger.info(f'{mismatch_count} mismatches found and {gap_count} gaps found. {match_count} matches found.')
SeqIO.write([record1, record2], f'{filename}.fasta', "fasta")
if clustal is True: aligned_identifiers1.reverse(), aligned_identifiers2.reverse()
sequences = [SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1"), if metadata1 is not None and metadata2 is not None:
SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")] aligned_metadata1.reverse(), aligned_metadata2.reverse()
write_clustal_alignment(sequences, f'{filename}.aln')
return score padded_sequences1, padded_sequences2 =self._pad_sequences(aligned_identifiers1, aligned_identifiers2, padding)
return padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score