Archive Ensembl HomeArchive Ensembl Home
BlastAndParseDistances.pm
Go to the documentation of this file.
00001 package Bio::EnsEMBL::Compara::RunnableDB::Families::BlastAndParseDistances;
00002 
00003 use strict;
00004 use FileHandle;
00005 
00006 use base ('Bio::EnsEMBL::Compara::RunnableDB::BaseRunnable');
00007 
00008 sub load_fasta_sequences_from_db {
00009     my ($self, $start_seq_id, $minibatch) = @_;
00010 
00011     my $idprefixed              = $self->param('idprefixed')  || 0;
00012     my $debug                   = $self->debug() || $self->param('debug') || 0;
00013 
00014     my $sql = qq {
00015         SELECT s.sequence_id, m.stable_id, s.sequence
00016           FROM member m, sequence s
00017          WHERE s.sequence_id BETWEEN ? AND ?
00018            AND m.sequence_id=s.sequence_id
00019       GROUP BY s.sequence_id
00020       ORDER BY s.sequence_id
00021     };
00022 
00023     if($debug) {
00024         print "SQL:  $sql\n";
00025     }
00026 
00027     my $sth = $self->compara_dba->dbc->prepare( $sql );
00028     $sth->execute( $start_seq_id, $start_seq_id+$minibatch-1 );
00029 
00030     my @fasta_list = ();
00031     while( my ($seq_id, $stable_id, $seq) = $sth->fetchrow() ) {
00032         $seq=~ s/(.{72})/$1\n/g;
00033         chomp $seq;
00034         push @fasta_list, ($idprefixed
00035                                 ? ">seq_id_${seq_id}_${stable_id}\n$seq\n"
00036                                 : ">$stable_id sequence_id=$seq_id\n$seq\n") ;
00037     }
00038     $sth->finish();
00039     $self->compara_dba->dbc->disconnect_when_inactive(1);
00040 
00041     return \@fasta_list;
00042 }
00043 
00044 sub load_name2index_mapping_from_db {
00045     my ($self) = @_;
00046 
00047     my $sql = qq {
00048         SELECT sequence_id, stable_id
00049           FROM member
00050          WHERE sequence_id
00051       GROUP BY sequence_id
00052     };
00053 
00054     my $sth = $self->compara_dba->dbc->prepare( $sql );
00055     $sth->execute();
00056 
00057     my %name2index = ();
00058     while( my ($seq_id, $stable_id) = $sth->fetchrow() ) {
00059         $name2index{$stable_id} = $seq_id;
00060     }
00061     $sth->finish();
00062     $self->compara_dba->dbc->disconnect_when_inactive(1);
00063 
00064     return \%name2index;
00065 }
00066 
00067 sub name2index { # can load the name2index mapping from db/file if necessary
00068     my ($self, $name) = @_;
00069 
00070     if($name=~/^seq_id_(\d+)_/) {
00071         return $1;
00072     } else {
00073         my $name2index = $self->param('name2index') || $self->param('name2index', $self->load_name2index_mapping_from_db() );
00074 
00075         return $name2index->{$name};
00076     }
00077 }
00078 
00079 sub fetch_input {
00080     my $self = shift @_;
00081 
00082     my $start_seq_id            = $self->param('sequence_id') || die "'sequence_id' is an obligatory parameter, please set it in the input_id hashref";
00083     my $minibatch               = $self->param('minibatch')   || 1;
00084     my $debug                   = $self->debug() || $self->param('debug') || 0;
00085 
00086     my $fasta_list = $self->load_fasta_sequences_from_db($start_seq_id, $minibatch);
00087 
00088     if(scalar(@$fasta_list)<$minibatch) {
00089         die "Could not load all ($minibatch) sequences, please investigate";
00090     }
00091 
00092     if($debug) {
00093         print "Loaded ".scalar(@$fasta_list)." sequences\n";
00094     }
00095 
00096     $self->param('fasta_list', $fasta_list);
00097 }
00098 
00099 sub parse_blast_table_into_matrix_hash {
00100     my ($self, $filename, $min_self_dist) = @_;
00101 
00102     my $roundto    = $self->param('roundto') || 0.0001;
00103 
00104     my %matrix_hash  = ();
00105 
00106     my $curr_name    = '';
00107     my $curr_index   = 0;
00108 
00109     open(BLASTTABLE, "<$filename") || die "Could not open the blast table file '$filename'";
00110     while(my $line = <BLASTTABLE>) {
00111 
00112         if($line=~/^#/) {
00113             if($line=~/^#\s+Query:\s+(\S+)/) {
00114                 $curr_name  = $1;
00115                 $curr_index = $self->name2index($curr_name)
00116                     || die "Parser could not map '$curr_name' to sequence_id";
00117 
00118                 $matrix_hash{$curr_index}{$curr_index} = $min_self_dist;   # stop losing singletons whose evalue to themselves is *above* 'evalue_limit' threshold
00119                                                                            # (that is, the ones that are "not even similar to themselves")
00120             }
00121         } else {
00122             my ($qname, $hname, $evalue) = split(/\s+/, $line);
00123 
00124             my $hit_index = $self->name2index($hname);
00125                 # we MUST be explicitly numeric here:
00126             my $distance  = ($evalue != 0) ? -log($evalue)/log(10) : 200;
00127 
00128                 # do the rounding to prevent the unnecessary growth of tables/files
00129             $distance = int($distance / $roundto) * $roundto;
00130 
00131             $matrix_hash{$curr_index}{$hit_index} = $distance;
00132         }
00133     }
00134     close BLASTTABLE;
00135 
00136     return \%matrix_hash;
00137 }
00138 
00139 sub run {
00140     my $self = shift @_;
00141 
00142     my $fasta_list              = $self->param('fasta_list'); # set by fetch_input()
00143     my $debug                   = $self->debug() || $self->param('debug') || 0;
00144 
00145     unless(scalar(@$fasta_list)) { # if we have no more work to do just exit gracefully
00146         if($debug) {
00147             warn "No work to do, exiting\n";
00148         }
00149         return;
00150     }
00151 
00152     my $blastdb_dir             = $self->param('blastdb_dir');
00153     my $blastdb_name            = $self->param('blastdb_name')  || die "'blastdb_name' is an obligatory parameter";
00154 
00155     my $minibatch               = $self->param('minibatch')     || 1;
00156 
00157     my $blast_bin_dir           = $self->param('blast_bin_dir') || '/software/ensembl/compara/ncbi-blast-2.2.23+/bin';
00158     my $blast_params            = $self->param('blast_params')  || '';  # no parameters to C++ binary means having composition stats on and -seg masking off
00159     my $evalue_limit            = $self->param('evalue_limit')  || 0.00001;
00160     my $tophits                 = $self->param('tophits')       || 250;
00161 
00162 
00163     my $blast_infile  = '/tmp/family_blast.in.'.$$;     # only for debugging
00164     my $blast_outfile = '/tmp/family_blast.out.'.$$;    # looks like inevitable evil (tried many hairy alternatives and failed)
00165 
00166     if($debug) {
00167         open(FASTA, ">$blast_infile") || die "Could not open '$blast_infile' for writing";
00168         print FASTA @$fasta_list;
00169         close FASTA;
00170     }
00171 
00172     my $blastdb = ($blastdb_dir ? $blastdb_dir.'/' : '').$blastdb_name;
00173     my $cmd = "${blast_bin_dir}/blastp -db $blastdb $blast_params -evalue $evalue_limit -num_descriptions $tophits -out $blast_outfile -outfmt '7 qacc sacc evalue'";
00174 
00175     if($debug) {
00176         warn "CMD:\t$cmd\n";
00177     }
00178 
00179     open( BLAST, "| $cmd") || die qq{could not execute "${cmd}", returned error code: $!};
00180     print BLAST @$fasta_list;
00181     close BLAST;
00182 
00183     my $matrix_hash = $self->parse_blast_table_into_matrix_hash($blast_outfile, -log($evalue_limit)/log(10) );
00184 
00185     my $expected_elements   = scalar(@$fasta_list);
00186     my $parsed_elements     = scalar(keys %$matrix_hash);
00187     unless($parsed_elements == $expected_elements) {
00188         die "Could only parse $parsed_elements out of $expected_elements";
00189     }
00190 
00191     $self->param('matrix_hash', $matrix_hash);        # store it in a parameter
00192 
00193     unless($debug) {
00194         unlink $blast_outfile;
00195     }
00196 }
00197 
00198 sub write_output {
00199     my $self = shift @_;
00200 
00201     if(my $matrix_hash = $self->param('matrix_hash')) {
00202         my @output_ids = ();
00203         while(my($row_id, $subhash) = each %$matrix_hash) {
00204             while(my($column_id, $value) = each %$subhash) {
00205                 push @output_ids, { 'row_id' => $row_id, 'column_id' => $column_id, 'value' => $value};
00206             }
00207         }
00208         $self->dataflow_output_id( \@output_ids, 3);
00209     }
00210 }
00211 
00212 1;
00213