| 1 | import os |
|---|
| 2 | import sys |
|---|
| 3 | import logging |
|---|
| 4 | |
|---|
| 5 | import xapian |
|---|
| 6 | from tokenizer import tokenize |
|---|
| 7 | from settings import DB_FILE |
|---|
| 8 | |
|---|
| 9 | from model import index_data, remove_data |
|---|
| 10 | |
|---|
| 11 | def xap_read_write(): |
|---|
| 12 | return xapian.flint_open(DB_FILE, xapian.DB_CREATE_OR_OPEN) |
|---|
| 13 | |
|---|
| 14 | def xap_reset(): |
|---|
| 15 | return xapian.flint_open(DB_FILE, xapian.DB_CREATE_OR_OVERWRITE) |
|---|
| 16 | |
|---|
| 17 | def xap_read_only(): |
|---|
| 18 | return xapian.flint_open(DB_FILE) |
|---|
| 19 | |
|---|
| 20 | def force_reset(): |
|---|
| 21 | db = xap_reset() |
|---|
| 22 | del db |
|---|
| 23 | |
|---|
| 24 | from threading import Thread |
|---|
| 25 | import time |
|---|
| 26 | |
|---|
| 27 | class IndexationWorker(Thread): |
|---|
| 28 | """reads the SQLDB to do the jobs""" |
|---|
| 29 | |
|---|
| 30 | def __init__(self, db_file, reset=False): |
|---|
| 31 | Thread.__init__(self) |
|---|
| 32 | self.db_file = db_file |
|---|
| 33 | self.is_working = False |
|---|
| 34 | if not reset: |
|---|
| 35 | self.db = xap_read_write() |
|---|
| 36 | else: |
|---|
| 37 | self.db = xap_reset() |
|---|
| 38 | self.running = False |
|---|
| 39 | |
|---|
| 40 | def _get_document_internal_id(self, uid): |
|---|
| 41 | """retrieves a document""" |
|---|
| 42 | enquire = xapian.Enquire(self.db) |
|---|
| 43 | query = xapian.Query('Q%s' % uid) |
|---|
| 44 | enquire.set_query(query) |
|---|
| 45 | res = list(enquire.get_mset(0, 1)) |
|---|
| 46 | if len(res) == 0: |
|---|
| 47 | return None |
|---|
| 48 | return res[0].docid |
|---|
| 49 | |
|---|
| 50 | def index_document(self, uid, text, language): |
|---|
| 51 | """indexes the given document""" |
|---|
| 52 | logging.debug('xap:indexing %s' % uid) |
|---|
| 53 | options = {'treshold': 2} |
|---|
| 54 | words = tokenize(text, options=options) |
|---|
| 55 | if language is not None: |
|---|
| 56 | options['lang'] = language |
|---|
| 57 | words.extend(tokenize(text, options=options)) |
|---|
| 58 | |
|---|
| 59 | old_docid = self._get_document_internal_id(uid) |
|---|
| 60 | doc = xapian.Document() |
|---|
| 61 | doc.add_term('Q%s' % uid, 1) |
|---|
| 62 | i = 1 |
|---|
| 63 | for word in words: |
|---|
| 64 | doc.add_posting(word, i) |
|---|
| 65 | i += 1 |
|---|
| 66 | |
|---|
| 67 | if old_docid is None: |
|---|
| 68 | self.db.add_document(doc) |
|---|
| 69 | else: |
|---|
| 70 | self.db.replace_document(old_docid, doc) |
|---|
| 71 | |
|---|
| 72 | logging.debug('xap:%s indexed' % uid) |
|---|
| 73 | |
|---|
| 74 | def delete_document(self, uid): |
|---|
| 75 | """removes the document""" |
|---|
| 76 | logging.debug('xap:deleting %s' % uid) |
|---|
| 77 | docid = self._get_document_internal_id(uid) |
|---|
| 78 | if docid is not None: |
|---|
| 79 | self.db.delete_document(docid) |
|---|
| 80 | logging.debug('xap:%s deleted' % uid) |
|---|
| 81 | |
|---|
| 82 | def _get_indexables(self): |
|---|
| 83 | res = index_data.select().execute() |
|---|
| 84 | jobs = [(item.docid, item.data, item.language_iso) for item in res] |
|---|
| 85 | if jobs != []: |
|---|
| 86 | logging.debug('xap:reading index table: %s' % str(jobs)) |
|---|
| 87 | return jobs |
|---|
| 88 | |
|---|
| 89 | def _get_removables(self): |
|---|
| 90 | res = remove_data.select().execute() |
|---|
| 91 | jobs = [item.docid for item in res] |
|---|
| 92 | if jobs != []: |
|---|
| 93 | logging.debug('xap:reading delete table: %s' % str(jobs)) |
|---|
| 94 | return jobs |
|---|
| 95 | |
|---|
| 96 | def _index_done(self, docid): |
|---|
| 97 | index_data.delete().execute(docid=docid) |
|---|
| 98 | |
|---|
| 99 | def _remove_done(self, docid): |
|---|
| 100 | remove_data.delete().execute(docid=docid) |
|---|
| 101 | |
|---|
| 102 | def run(self): |
|---|
| 103 | self.running = True |
|---|
| 104 | logging.debug('xap:launched') |
|---|
| 105 | logging.debug('xap:database has %d documents' % self.db.get_doccount()) |
|---|
| 106 | |
|---|
| 107 | while self.running: |
|---|
| 108 | # index |
|---|
| 109 | self.is_working = True |
|---|
| 110 | indexed = [] |
|---|
| 111 | removed = [] |
|---|
| 112 | try: |
|---|
| 113 | try: |
|---|
| 114 | to_index = self._get_indexables() |
|---|
| 115 | self.db.begin_transaction() |
|---|
| 116 | try: |
|---|
| 117 | for docid, data, language in to_index: |
|---|
| 118 | self.index_document(docid, data, language=language) |
|---|
| 119 | indexed.append(docid) |
|---|
| 120 | # remove |
|---|
| 121 | to_remove = self._get_removables() |
|---|
| 122 | for docid in to_remove: |
|---|
| 123 | self.delete_document(docid) |
|---|
| 124 | removed.append(docid) |
|---|
| 125 | except: |
|---|
| 126 | self.db.cancel_transaction() |
|---|
| 127 | raise |
|---|
| 128 | else: |
|---|
| 129 | self.db.commit_transaction() |
|---|
| 130 | self.db.flush() |
|---|
| 131 | |
|---|
| 132 | # now cleaning sql tables |
|---|
| 133 | for docid in indexed: |
|---|
| 134 | self._index_done(docid) |
|---|
| 135 | |
|---|
| 136 | for docid in removed: |
|---|
| 137 | self._remove_done(docid) |
|---|
| 138 | except: |
|---|
| 139 | raise |
|---|
| 140 | finally: |
|---|
| 141 | self.is_working = False |
|---|
| 142 | time.sleep(.1) |
|---|
| 143 | |
|---|
| 144 | logging.debug('xap:stopped') |
|---|
| 145 | |
|---|
| 146 | worker = None |
|---|
| 147 | |
|---|
| 148 | def start_server(reset=False): |
|---|
| 149 | global worker |
|---|
| 150 | logging.debug('creating indexation worker over %s' % (DB_FILE)) |
|---|
| 151 | worker = IndexationWorker(DB_FILE, reset) |
|---|
| 152 | worker.start() |
|---|
| 153 | |
|---|
| 154 | def stop_server(): |
|---|
| 155 | global worker |
|---|
| 156 | if worker is not None: |
|---|
| 157 | worker.running = False |
|---|
| 158 | worker.join() |
|---|
| 159 | worker = None |
|---|
| 160 | |
|---|
| 161 | def is_working(): |
|---|
| 162 | return worker.is_working |
|---|
| 163 | |
|---|
| 164 | from atexit import register |
|---|
| 165 | register(stop_server) |
|---|