diff --git a/debug.py b/debug.py index 72ef043..e022498 100644 --- a/debug.py +++ b/debug.py @@ -1,5 +1,2 @@ -from herrewebpy.mlops import anomaly_scoring -import seaborn as sns - -df = sns.load_dataset('iris') -anomaly_scoring.train_model(df) \ No newline at end of file +from herrewebpy.bioinformatics import sequence_alignment +sequence_alignment.SequenceAlignment(['aa', 'bb', 'cc'],['bb','aa','cc'], ['1','2','3'], ['1','2','3']) \ No newline at end of file diff --git a/examples/bioinformatics.ipynb b/examples/bioinformatics.ipynb new file mode 100644 index 0000000..57d0b9d --- /dev/null +++ b/examples/bioinformatics.ipynb @@ -0,0 +1,28 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Perform a sequence alignment with metadata (align sequence number 2 on sequence number 1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from herrewebpy.bioinformatics import sequence_alignment\n", + "sequence_alignment.SequenceAlignment(['aa', 'bb', 'cc'],['bb','aa','cc'], ['1','2','3'], ['1','2','3'])" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/herrewebpy/bioinformatics/sequence_alignment.py b/herrewebpy/bioinformatics/sequence_alignment.py index dd2c661..b30580d 100644 --- a/herrewebpy/bioinformatics/sequence_alignment.py +++ b/herrewebpy/bioinformatics/sequence_alignment.py @@ -18,23 +18,6 @@ Date: 25 November Herreweb """ -def _parse_sequence(sequence): - """ - Parse a sequence, either as a pandas DataFrame or a string, and return the result. - - Parameters: - sequence (pd.DataFrame or str): The input sequence data to be parsed. - """ - if sequence is None: - return None - elif isinstance(sequence, pd.Series): - logger.info("Parsing data as dataframe, so converting") - sequence = sequence.str.cat(sep='|') - logger.info(f'Assuming type is string, returning list') - identifiers = [item.strip('|') for item in sequence.split('|') if item.strip('|')] - return identifiers - - def write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2, score, output_filename): """ Write a multiple sequence alignment with associated metadata in Stockholm format to a file. @@ -97,192 +80,212 @@ def write_text_format(aligned_identifiers1, aligned_identifiers2, score, output_ with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2}\n") for id2 in aligned_identifiers2] -def _pad_sequences(aligned_identifiers1, aligned_identifiers2, padding): +class SequenceAlignment: + def __init__(self, sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-2, + match_score=1, mismatch_penalty=-1, filename="alignment", threads=None, + stockholm=True, fasta=True, clustal=False, padding='-'): """ - Add paddings (-) to sequences. Drastically helps visualize alignments. + Perform global sequence alignment and save the results in various formats. + + Parameters: + sequence1 (str): The first sequence to align. + sequence2 (str): The second sequence to align. + metadata1 (str, optional): Metadata for the first sequence. Default is None. + metadata2 (str, optional): Metadata for the second sequence. Default is None. + gap_penalty (int, optional): Penalty for introducing a gap. Default is -1. + match_score (int, optional): Score for a match. Default is 1. + mismatch_penalty (int, optional): Penalty for a mismatch. Default is -10. + filename (str, optional): Name for the output files. Default is "alignment". + threads (int, optional): Number of threads for parallel execution. Default is None. + stockholm (bool, optional): Whether to output in Stockholm format. Default is True. + fasta (bool, optional): Whether to output in FASTA format. Default is True. + clustal (bool, optional): Whether to output in Clustal format. Default is False. + padding (str, optional): Padding character for alignment. Default is '-'. + + Returns: + int: The alignment score. """ - padded_sequences1, padded_sequences2 = [], [] - for seq1, seq2 in zip(aligned_identifiers1, aligned_identifiers2): - if seq1 == '': - padded_seq1 = f'{padding}' * len(seq2) - padded_sequences1.append(padded_seq1) - padded_sequences2.append(seq2) - elif seq2 == '': - padded_seq2 = f'{padding}' * len(seq1) - padded_sequences1.append(seq1) - padded_sequences2.append(padded_seq2) - else: - if len(seq1) < len(seq2): - padded_seq1 = seq1 + f'{padding}' * (len(seq2) - len(seq1)) + self.sequence1 = sequence1 + self.sequence2 = sequence2 + self.metadata1 = metadata1 + self.metadata2 = metadata2 + self.gap_penalty = gap_penalty + self.match_score = match_score + self.mismatch_penalty = mismatch_penalty + self.filename = filename + self.threads = threads + self.stockholm = stockholm + self.fasta = fasta + self.clustal = clustal + self.padding = padding + + self.align() + + + def align(self): + padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score = self._global_alignment_np( + self.sequence1, self.sequence2, self.metadata1, self.metadata2, + self.gap_penalty, self.match_score, self.mismatch_penalty, self.threads, self.padding + ) + + if self.metadata1 is not None and self.metadata2 is not None: + if self.stockholm: + write_stockholm_alignment_with_metadata( + padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score, f'{self.filename}.sto' + ) + if self.fasta: + write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt', + aligned_metadata1, aligned_metadata2) + else: + write_text_format(padded_sequences1, padded_sequences2, score, f'{self.filename}-text.txt') + + record1 = SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1") + record2 = SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2") + + if self.fasta: + SeqIO.write([record1, record2], f'{self.filename}.fasta', "fasta") + + if self.clustal: + sequences = [SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1"), + SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")] + write_clustal_alignment(sequences, f'{self.filename}.aln') + + self.score = score + + + def _parse_sequence(self, sequence): + """ + Parse a sequence, either as a pandas DataFrame or a string, and return the result. + + Parameters: + sequence (pd.DataFrame or str): The input sequence data to be parsed. + """ + if sequence is None: + return None + elif isinstance(sequence, pd.Series): + logger.info("Parsing data as dataframe, so converting") + sequence = sequence.str.cat(sep='|') + elif isinstance(sequence, list): + sequence = '|'.join(sequence) + logger.info(f'Assuming type is string, returning list') + identifiers = [item.strip('|') for item in sequence.split('|') if item.strip('|')] + return identifiers + + + def _pad_sequences(self, aligned_identifiers1, aligned_identifiers2, padding): + """ + Add paddings (-) to sequences. Drastically helps visualize alignments. + """ + padded_sequences1, padded_sequences2 = [], [] + for seq1, seq2 in zip(aligned_identifiers1, aligned_identifiers2): + if seq1 == '': + padded_seq1 = f'{padding}' * len(seq2) padded_sequences1.append(padded_seq1) padded_sequences2.append(seq2) - else: - padded_seq2 = seq2 + f'{padding}' * (len(seq1) - len(seq2)) + elif seq2 == '': + padded_seq2 = f'{padding}' * len(seq1) padded_sequences1.append(seq1) padded_sequences2.append(padded_seq2) - return padded_sequences1, padded_sequences2 - - -def global_alignment_np(sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-1, - match_score=1, mismatch_penalty=-10, threads=None): - """ - Description: - This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`, - using the Needleman-Wunsch algorithm. It aligns the sequences based on the specified scoring parameters - for gap penalties, match scores, and mismatch penalties. - - The function returns a tuple containing the following elements: - - The aligned longer sequence (list of strings) where gaps are indicated by '-' characters. - - The aligned shorter sequence (list of strings) where gaps are indicated by '-' characters. - - Aligned metadata for sequence1 (list of strings). - - Aligned metadata for sequence2 (list of strings). - - The alignment score (int). - - Note: - If additional metadata is not provided (metadata1 or metadata2 is None), the corresponding aligned_metadata - lists will also be None. - - Example: - ```python - sequence1 = "AGCT" - sequence2 = "AAGCT" - aligned_seq1, aligned_seq2, align_metadata1, align_metadata2, score = global_alignment_np( - sequence1, sequence2, metadata1="ABC", metadata2="XYZ" - ) - ``` - - """ - identifiers1, metadata1 = _parse_sequence(sequence1), _parse_sequence(metadata1) - identifiers2, metadata2 = _parse_sequence(sequence2), _parse_sequence(metadata2) - m, n = len(identifiers1), len(identifiers2) - dp_matrix = np.zeros((m + 1, n + 1)) - - progress_bar = tqdm.tqdm(total=(m + 1) * (n + 1)) - with ThreadPoolExecutor(threads) as executor: - for i in range(1, m + 1): - for j in range(1, n + 1): - if identifiers1[i - 1] == identifiers2[j - 1]: - dp_matrix[i][j] = dp_matrix[i - 1][j - 1] + match_score else: - dp_matrix[i][j] = max(dp_matrix[i - 1][j] + gap_penalty, - dp_matrix[i][j - 1] + gap_penalty, - dp_matrix[i - 1][j - 1] + mismatch_penalty) - progress_bar.update(1) - progress_bar.close() + if len(seq1) < len(seq2): + padded_seq1 = seq1 + f'{padding}' * (len(seq2) - len(seq1)) + padded_sequences1.append(padded_seq1) + padded_sequences2.append(seq2) + else: + padded_seq2 = seq2 + f'{padding}' * (len(seq1) - len(seq2)) + padded_sequences1.append(seq1) + padded_sequences2.append(padded_seq2) + return padded_sequences1, padded_sequences2 - aligned_identifiers1, aligned_metadata1 = [], [] - aligned_identifiers2, aligned_metadata2 = [], [] - i, j = m, n - score = dp_matrix[m][n] - logger.info(f'Calculated score is: {score}') + def _global_alignment_np(self, sequence1, sequence2, metadata1, metadata2, gap_penalty, + match_score, mismatch_penalty, threads, padding): + """ + Description: + This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`, + using the Needleman-Wunsch algorithm. It aligns the sequences based on the specified scoring parameters + for gap penalties, match scores, and mismatch penalties. - while i > 0 and j > 0: - if identifiers1[i - 1] == identifiers2[j - 1]: - aligned_identifiers1.append(identifiers1[i - 1]) - aligned_identifiers2.append(identifiers2[j - 1]) - if metadata1 is not None and metadata2 is not None: - aligned_metadata1.append(metadata1[i - 1]) - aligned_metadata2.append(metadata2[j - 1]) - i -= 1 - j -= 1 - elif dp_matrix[i][j] == dp_matrix[i - 1][j - 1] + mismatch_penalty: - aligned_identifiers1.append(identifiers1[i - 1]) - aligned_identifiers2.append(identifiers2[j - 1]) - if metadata1 is not None and metadata2 is not None: - aligned_metadata1.append(metadata1[i - 1]) - aligned_metadata2.append(metadata2[j - 1]) - i -= 1 - j -= 1 - elif dp_matrix[i][j] == dp_matrix[i - 1][j] + gap_penalty: - aligned_identifiers1.append(identifiers1[i - 1]) - aligned_identifiers2.append('') - if metadata1 is not None and metadata2 is not None: - aligned_metadata1.append(metadata1[i - 1]) - aligned_metadata2.append(metadata2[j - 1]) - i -= 1 - else: - aligned_identifiers1.append('') - aligned_identifiers2.append(identifiers2[j - 1]) - if metadata1 is not None and metadata2 is not None: - aligned_metadata1.append(metadata1[i - 1]) - aligned_metadata2.append(metadata2[j - 1]) - j -= 1 + The function returns a tuple containing the following elements: + - The aligned longer sequence (list of strings) where gaps are indicated by '-' characters. + - The aligned shorter sequence (list of strings) where gaps are indicated by '-' characters. + - Aligned metadata for sequence1 (list of strings). + - Aligned metadata for sequence2 (list of strings). + - The alignment score (int). - aligned_identifiers1.reverse() - aligned_identifiers2.reverse() - if metadata1 is not None and metadata2 is not None: - aligned_metadata1.reverse() - aligned_metadata2.reverse() + Note: + If additional metadata is not provided (metadata1 or metadata2 is None), the corresponding aligned_metadata + lists will also be None. - padded_sequences1, padded_sequences2 =_pad_sequences(aligned_identifiers1, aligned_identifiers2, padding) - - return padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score - + Example: + ```python + sequence1 = "AGCT" + sequence2 = "AAGCT" + aligned_seq1, aligned_seq2, align_metadata1, align_metadata2, score = _global_alignment_np( + sequence1, sequence2, metadata1="ABC", metadata2="XYZ" + ) + ``` -def sequence_alignment(sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-1, - match_score=1, mismatch_penalty=-10, filename="alignment", threads=None, - stockholm=True, fasta=True, clustal=False, padding='-'): - """ - Perform global sequence alignment and save the results in various formats. + """ + identifiers1, metadata1 = self._parse_sequence(sequence1), self._parse_sequence(metadata1) + identifiers2, metadata2 = self._parse_sequence(sequence2), self._parse_sequence(metadata2) + m, n = len(identifiers1), len(identifiers2) + dp_matrix = np.zeros((m + 1, n + 1)) - Parameters: - sequence1 (str): The first sequence to align. - sequence2 (str): The second sequence to align. - metadata1 (str, optional): Metadata for the first sequence. Default is None. - metadata2 (str, optional): Metadata for the second sequence. Default is None. - gap_penalty (int, optional): Penalty for introducing a gap. Default is -1. - match_score (int, optional): Score for a match. Default is 1. - mismatch_penalty (int, optional): Penalty for a mismatch. Default is -10. - filename (str, optional): Name for the output files. Default is "alignment". - threads (int, optional): Number of threads for parallel execution. Default is None. - stockholm (bool, optional): Whether to output in Stockholm format. Default is True. - fasta (bool, optional): Whether to output in FASTA format. Default is True. + progress_bar = tqdm.tqdm(total=(m + 1) * (n + 1)) + with ThreadPoolExecutor(threads) as executor: + for i in range(1, m + 1): + for j in range(1, n + 1): + if identifiers1[i - 1] == identifiers2[j - 1]: + dp_matrix[i][j] = dp_matrix[i - 1][j - 1] + match_score + else: + dp_matrix[i][j] = max(dp_matrix[i - 1][j] + gap_penalty, + dp_matrix[i][j - 1] + gap_penalty, + dp_matrix[i - 1][j - 1] + mismatch_penalty) + progress_bar.update(1) + progress_bar.close() - Returns: - int: The alignment score. + aligned_identifiers1, aligned_metadata1 = [], [] + aligned_identifiers2, aligned_metadata2 = [], [] - Description: - This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`, - using the Needleman-Wunsch algorithm. It automatically determines the optimal scoring parameters for gap penalties, - match scores, and mismatch penalties based on a sample alignment. + score = dp_matrix[m][n] + logger.info(f'Calculated score is: {score}. (negative is generally bad, positive is good)') + match_count, mismatch_count, gap_count = 0, 0, 0 - The function saves the alignment in various formats based on the specified options: - - Stockholm format if `stockholm` is True. - - FASTA format if `fasta` is True. - - A text file with the aligned sequences and metadata. + # Finds matches/mismatches and introduces gaps if mismatches are found + while m > 0 and n > 0: + if identifiers1[m - 1] == identifiers2[n - 1]: + aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1]) + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1]) + m -= 1 + n -= 1 + match_count += 1 + elif dp_matrix[m][n] == dp_matrix[m - 1][n - 1] + mismatch_penalty: + aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append(identifiers2[n - 1]) + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1]) + m -= 1 + n -= 1 + mismatch_count += 1 + elif dp_matrix[m][n] == dp_matrix[m - 1][n] + gap_penalty: + aligned_identifiers1.append(identifiers1[m - 1]), aligned_identifiers2.append('') + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[m - 1]),aligned_metadata2.append(metadata2[n - 1]) + m -= 1 + gap_count += 1 + else: + aligned_identifiers1.append(''), aligned_identifiers2.append(identifiers2[n - 1]) + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[m - 1]), aligned_metadata2.append(metadata2[n - 1]) + n -= 1 + + logger.info(f'{mismatch_count} mismatches found and {gap_count} gaps found. {match_count} matches found.') - The alignment score is returned as an integer. - - Example: - alignment_score = sequence_alignment("AGTACG", "ATGC", metadata1="abc", metadata2="def", gap_penalty=-2, - match_score=2, mismatch_penalty=-1, filename="my_alignment", - threads=4, stockholm=True, fasta=True) - """ - padded_sequences1, padded_sequences2, aligned_metadata1, \ - aligned_metadata2, score = global_alignment_np(sequence1, sequence2, metadata1, metadata2, gap_penalty, - match_score, mismatch_penalty, threads, padding) - - if metadata1 is not None and metadata2 is not None: - if stockholm is True: - write_stockholm_alignment_with_metadata(padded_sequences1, padded_sequences2, aligned_metadata1, - aligned_metadata2, score, f'{filename}.sto') - if fasta is True: - write_text_format(padded_sequences1, padded_sequences2, score, f'{filename}-text.txt', - aligned_metadata1, aligned_metadata2) - else: - write_text_format(padded_sequences1, padded_sequences2, score, f'{filename}-text.txt') - - record1 = SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1") - record2 = SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2") - - if fasta is True: - SeqIO.write([record1, record2], f'{filename}.fasta', "fasta") - - if clustal is True: - sequences = [SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1"), - SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2")] - write_clustal_alignment(sequences, f'{filename}.aln') - - return score + aligned_identifiers1.reverse(), aligned_identifiers2.reverse() + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.reverse(), aligned_metadata2.reverse() + padded_sequences1, padded_sequences2 =self._pad_sequences(aligned_identifiers1, aligned_identifiers2, padding) + + return padded_sequences1, padded_sequences2, aligned_metadata1, aligned_metadata2, score