root / xap / trunk / xapindexer.py

Revision 225:8ae6a7d30952, 4.7 kB (checked in by Lafaye Philippe (RAGE2000) <lafaye@…>, 7 months ago)

Configure the database engine for return unicode and we d'ont need to convert
it in the indexer

Line 
1import os
2import sys
3import logging
4
5import xapian
6from tokenizer import tokenize
7from settings import DB_FILE
8
9from model import index_data, remove_data
10
11def xap_read_write():
12    return xapian.flint_open(DB_FILE, xapian.DB_CREATE_OR_OPEN)
13
14def xap_reset():
15    return xapian.flint_open(DB_FILE, xapian.DB_CREATE_OR_OVERWRITE)
16
17def xap_read_only():
18    return xapian.flint_open(DB_FILE)
19
20def force_reset():
21    db = xap_reset()
22    del db
23
24from threading import Thread
25import time
26
27class IndexationWorker(Thread):
28    """reads the SQLDB to do the jobs"""
29
30    def __init__(self, db_file, reset=False):
31        Thread.__init__(self)
32        self.db_file = db_file
33        self.is_working = False
34        if not reset:
35            self.db = xap_read_write()
36        else:
37            self.db = xap_reset()
38        self.running = False
39
40    def _get_document_internal_id(self, uid):
41        """retrieves a document"""
42        enquire = xapian.Enquire(self.db)
43        query = xapian.Query('Q%s' % uid)
44        enquire.set_query(query)
45        res = list(enquire.get_mset(0, 1))
46        if len(res) == 0:
47            return None
48        return res[0].docid
49
50    def index_document(self, uid, text, language):
51        """indexes the given document"""
52        logging.debug('xap:indexing %s' % uid)
53        options = {'treshold': 2}
54        words = tokenize(text, options=options)
55        if language is not None:
56            options['lang'] = language
57            words.extend(tokenize(text, options=options))
58
59        old_docid = self._get_document_internal_id(uid)
60        doc = xapian.Document()
61        doc.add_term('Q%s' % uid, 1)
62        i = 1
63        for word in words:
64            doc.add_posting(word, i)
65            i += 1
66
67        if old_docid is None:
68            self.db.add_document(doc)
69        else:
70            self.db.replace_document(old_docid, doc)
71
72        logging.debug('xap:%s indexed' % uid)
73
74    def delete_document(self, uid):
75        """removes the document"""
76        logging.debug('xap:deleting %s' % uid)
77        docid = self._get_document_internal_id(uid)
78        if docid is not None:
79            self.db.delete_document(docid)
80        logging.debug('xap:%s deleted' % uid)
81
82    def _get_indexables(self):
83        res = index_data.select().execute()
84        jobs = [(item.docid, item.data, item.language_iso) for item in res]
85        if jobs != []:
86            logging.debug('xap:reading index table: %s' % str(jobs))
87        return jobs
88
89    def _get_removables(self):
90        res = remove_data.select().execute()
91        jobs = [item.docid for item in res]
92        if jobs != []:
93            logging.debug('xap:reading delete table: %s' % str(jobs))
94        return jobs
95
96    def _index_done(self, docid):
97        index_data.delete().execute(docid=docid)
98
99    def _remove_done(self, docid):
100        remove_data.delete().execute(docid=docid)
101
102    def run(self):
103        self.running = True
104        logging.debug('xap:launched')
105        logging.debug('xap:database has %d documents' % self.db.get_doccount())
106
107        while self.running:
108            # index
109            self.is_working = True
110            indexed = []
111            removed = []
112            try:
113                try:
114                    to_index = self._get_indexables()
115                    self.db.begin_transaction()
116                    try:
117                        for docid, data, language in to_index:
118                            self.index_document(docid, data, language=language)
119                            indexed.append(docid)
120                        # remove
121                        to_remove = self._get_removables()
122                        for docid in to_remove:
123                            self.delete_document(docid)
124                            removed.append(docid)
125                    except:
126                        self.db.cancel_transaction()
127                        raise
128                    else:
129                        self.db.commit_transaction()
130                        self.db.flush()
131
132                    # now cleaning sql tables
133                    for docid in indexed:
134                        self._index_done(docid)
135
136                    for docid in removed:
137                        self._remove_done(docid)
138                except:
139                    raise
140            finally:
141                self.is_working = False
142                time.sleep(.1)
143
144        logging.debug('xap:stopped')
145
146worker = None
147
148def start_server(reset=False):
149    global worker
150    logging.debug('creating indexation worker over %s' % (DB_FILE))
151    worker = IndexationWorker(DB_FILE, reset)
152    worker.start()
153
154def stop_server():
155    global worker
156    if worker is not None:
157        worker.running = False
158        worker.join()
159        worker = None
160
161def is_working():
162    return worker.is_working
163
164from atexit import register
165register(stop_server)
Note: See TracBrowser for help on using the browser.