Added:
/trunk/xappy/unittests/multiple_caches.py
Modified:
/trunk/ChangeLog
/trunk/xappy/cachemanager/generic.py
/trunk/xappy/cachemanager/xapian_manager.py
/trunk/xappy/indexerconnection.py
/trunk/xappy/searchconnection.py
=======================================
--- /dev/null
+++ /trunk/xappy/unittests/multiple_caches.py Wed Mar 16 06:37:58 2011
@@ -0,0 +1,214 @@
+# Copyright (C) 2011 Bruno Rezende
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
copy
+# of this software and associated documentation files (the "Software"), to
deal
+# in the Software without restriction, including without limitation the
rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included
in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE
+# SOFTWARE.
+from __future__ import with_statement
+
+import unittest
+import tempfile, shutil
+from contextlib import contextmanager
+import os
+
+from xappy.cachemanager import XapianCacheManager
+from xappy import IndexerConnection, SearchConnection, FieldActions,
Field,\
+ UnprocessedDocument
+
+@contextmanager
+def tempdir(**kwargs):
+ path = tempfile.mkdtemp(**kwargs)
+ try:
+ yield path
+ finally:
+ shutil.rmtree(path)
+
+
+
+CACHE_SAMPLES = {
+ '1': (
+ {'queryid': 1, 'queryrepr': 'term_a', 'docids': [4, 2],},
+ {'queryid': 2, 'queryrepr': 'term_b', 'docids': [2, 4],},
+ ),
+ 'cache2': (
+ {'queryid': 1, 'queryrepr': 'term_a', 'docids': [3, 4, 1],},
+ {'queryid': 2, 'queryrepr': 'term_b', 'docids': [1, 4, 3],},
+ ),
+}
+
+class TestMultiCacheSearchIndexLoader(unittest.TestCase):
+ def _create_index(self, indexpath):
+ iconn = IndexerConnection(indexpath)
+ iconn.add_field_action('field', FieldActions.INDEX_FREETEXT,
language='en')
+
+ documents = [('1', [('term_a', 1), ('term_b', 5)]),
+ ('2', [('term_a', 2), ('term_b', 4)]),
+ ('3', [('term_a', 3), ('term_b', 3)]),
+ ('4', [('term_a', 4), ('term_b', 2)]),
+ ('5', [('term_a', 5), ('term_b', 1)]),
+ ]
+
+ for docid, terms in documents:
+ pdoc = self._create_processed_doc(iconn, docid, terms)
+ iconn.replace(pdoc, xapid=docid)
+ iconn.flush()
+ iconn.close()
+
+ def _create_processed_doc(self, iconn, docid, terms):
+ xappy_doc = UnprocessedDocument(docid)
+ xappy_doc.fields.append(Field('field', 'term_a'))
+ xappy_doc.fields.append(Field('field', 'term_b'))
+ pdoc = iconn.process(xappy_doc)
+ for term, wdf in terms:
+ pdoc.add_term('field', term, wdf)
+ return pdoc
+
+ def _create_cache(self, cache_path, cache_id):
+ # create a cache
+ cm = XapianCacheManager(cache_path, id=cache_id)
+ cache_sample = CACHE_SAMPLES[cache_id]
+ for querydata in cache_sample:
+ cm.set_queryid(querydata['queryrepr'], querydata['queryid'])
+ cm.set_hits(querydata['queryid'], querydata['docids'])
+ cm.flush()
+ cm.close()
+
+ def _apply_cache(self, indexpath, cachepath, cache_id):
+ idx = IndexerConnection(indexpath)
+ cm = XapianCacheManager(cachepath, id=cache_id)
+
+ idx.set_cache_manager(cm)
+ idx.apply_cached_items()
+ idx.close()
+
+ def _create_and_apply_cache(self, indexpath, cachepath, cacheid):
+ self._create_cache(cachepath, cacheid)
+ self._apply_cache(indexpath, cachepath, cacheid)
+
+ def _check_cache_results(self, indexpath, cachepath, cacheid,
expected_results, num_results=10):
+ # set cache manager
+ cm = XapianCacheManager(cachepath, id=cacheid)
+
+ search_conn = SearchConnection(indexpath)
+ search_conn.set_cache_manager(cm)
+
+ query_id, query_term = (1, 'term_a')
+ cache_query_id = cm.get_queryid(query_term) # obtain query_id from
the cache
+ self.assertEqual(query_id, cache_query_id)
+
+ non_cached, cached = expected_results
+
+ query = search_conn.query_field('field', query_term)
+ base_result = [r.id for r in query.search(0, num_results)]
+ # see if the results without merging the query are ok
+ self.assertEqual(non_cached, base_result)
+
+ cached_query = query.merge_with_cached(query_id)
+ cached_result = [r.id for r in cached_query.search(0, num_results)]
+ # test the merged query result
+ self.assertEqual(cached, cached_result)
+
+ search_conn.close()
+ cm.close()
+
+ def test_single_cache_applying(self):
+ with tempdir() as basepath:
+
+ # create an index
+ indexpath = os.path.join(basepath, 'test_index')
+ self._create_index(indexpath)
+
+ # create and apply cache
+ cachepath = os.path.join(basepath, 'cache')
+ self._create_and_apply_cache(indexpath, cachepath, '1')
+
+ self._check_cache_results(indexpath, cachepath, '1',
[['5', '4', '3', '2', '1'], ['4', '2', '5', '3', '1']])
+
+ def test_multiple_cache(self):
+ with tempdir() as basepath:
+ # create an index
+ indexpath = os.path.join(basepath, 'test_index')
+ self._create_index(indexpath)
+
+ base_cachepath = os.path.join(basepath, 'cache')
+ os.makedirs(base_cachepath)
+
+ # create and apply cache 1
+ cachepath1 = os.path.join(base_cachepath, '1')
+ self._create_and_apply_cache(indexpath, cachepath1, '1')
+
+ # create and apply cache 2
+ cachepath2 = os.path.join(base_cachepath, '2')
+ self._create_and_apply_cache(indexpath, cachepath2, 'cache2')
+
+ # test cache 1
+ self._check_cache_results(indexpath, cachepath1, '1',
[['5', '4', '3', '2', '1'], ['4', '2', '5', '3', '1']])
+ # test cache 2
+ self._check_cache_results(indexpath, cachepath2, 'cache2',
[['5', '4', '3', '2', '1'], ['3', '4', '1', '5', '2']])
+
+ # the document whose docid is 4 is in both caches, we're
+ # testing here if replacing it with one cache manager set
+ # will change the result in the other cache. It must change.
+
+ # replace document
+ iconn = IndexerConnection(indexpath)
+ cm = XapianCacheManager(cachepath2, id='cache2')
+ iconn.set_cache_manager(cm)
+ docid, terms = ('4', [('term_a', 4), ('term_b', 2)])
+ pdoc = self._create_processed_doc(iconn, docid, terms)
+ iconn.replace(pdoc, xapid=int(docid))
+ iconn.flush()
+ iconn.close()
+ cm.close()
+
+ # check if the results in both caches are ok
+ self._check_cache_results(indexpath, cachepath1, '1',
[['5', '4', '3', '2', '1'], ['4', '2', '5', '3', '1']])
+ self._check_cache_results(indexpath, cachepath2, 'cache2',
[['5', '4', '3', '2', '1'], ['3', '4', '1', '5', '2']])
+
+ # there are 2 code pathes when we deal with caches:
+ # 1. the cache has not enough results
+ # 2. the cache has enough results
+ # in the first case, the result will come from a mixed query
+ # against the index. In the second, the results will come from
+ # the cache_manger. So, the cache managers must be updated.
+ # When using multiple cache_manager, the deletion must be
+ # explicitly done in each cache, and then we must ask for the
+ # delete method to ignore cache (not try to update it). A
better
+ # approach for this will be developed.
+
+ # remove document
+ iconn = IndexerConnection(indexpath)
+ cm = XapianCacheManager(cachepath1, id='1')
+ iconn.set_cache_manager(cm)
+ iconn._remove_cached_items(xapid=4)
+ cm = XapianCacheManager(cachepath2, id='cache2')
+ iconn.set_cache_manager(cm)
+ iconn._remove_cached_items(xapid=4)
+ cm.close()
+ iconn.delete(xapid=4, ignore_cache=True)
+ iconn.flush()
+ iconn.close()
+
+ # cache has not enough results
+ self._check_cache_results(indexpath, cachepath1, '1',
[['5', '3', '2', '1'], ['2', '5', '3', '1']])
+ self._check_cache_results(indexpath, cachepath2, 'cache2',
[['5', '3', '2', '1'], ['3', '1', '5', '2']])
+
+ # cache has enough results
+ self._check_cache_results(indexpath, cachepath1, '1', [['5'],
['2']], num_results=1)
+ self._check_cache_results(indexpath, cachepath2, 'cache2',
[['5', '3'], ['3', '1']], num_results=2)
+
+if __name__ == '__main__':
+ unittest.main()
=======================================
--- /trunk/ChangeLog Mon Dec 20 06:25:09 2010
+++ /trunk/ChangeLog Wed Mar 16 06:37:58 2011
@@ -1,3 +1,10 @@
+Wed Mar 16 13:37:17 GMT 2011 Richard Boulton <ric...@tartarus.org>
+
+ * xappy/cachemanager/generic.py,xappy/cachemanager/xapian_manager.py,
+ xappy/indexerconnection.py,xappy/searchconnection.py,
+ xappy/unittests/multiple_caches.py: Patch from Bruno Rezende -
+ allow multiple caches to be applied. Closes ticket #36.
+
Mon Dec 20 14:24:39 GMT 2010 Richard Boulton <ric...@tartarus.org>
* xappy/searchresults.py,xappy/unittests/collapse.py: Add access to
=======================================
--- /trunk/xappy/cachemanager/generic.py Thu Sep 9 08:25:30 2010
+++ /trunk/xappy/cachemanager/generic.py Wed Mar 16 06:37:58 2011
@@ -1,6 +1,7 @@
#!/usr/bin/env python
#
# Copyright (C) 2009,2010 Richard Boulton
+# Copyright (C) 2011 Bruno Rezende
#
# Permission is hereby granted, free of charge, to any person obtaining a
copy
# of this software and associated documentation files (the "Software"), to
deal
@@ -477,6 +478,14 @@
if nextid <= queryid:
self['I'] = self.encode_int(queryid + 1)
+ def num_cached_queries(self):
+ v = self['I']
+ if len(v) == 0:
+ currentid = 0
+ else:
+ currentid = self.decode_int(v)
+ return currentid + 1
+
@staticmethod
def make_hit_chunk_key(queryid, chunk):
"""Make the key for looking up a particular chunk for a given
queryid.
=======================================
--- /trunk/xappy/cachemanager/xapian_manager.py Wed Dec 30 07:04:16 2009
+++ /trunk/xappy/cachemanager/xapian_manager.py Wed Mar 16 06:37:58 2011
@@ -1,6 +1,7 @@
#!/usr/bin/env python
#
# Copyright (C) 2009 Richard Boulton
+# Copyright (C) 2011 Bruno Rezende
#
# Permission is hereby granted, free of charge, to any person obtaining a
copy
# of this software and associated documentation files (the "Software"), to
deal
@@ -27,6 +28,10 @@
import generic
import os
import shutil
+try:
+ import simplejson as json
+except ImportError:
+ import json
import xapian
class XapianCacheManager(generic.KeyValueStoreCacheManager):
@@ -40,11 +45,14 @@
provide other implementations of iter_by_docid(), which may be more
efficient for some situations.
+ Multiple caches may be used by specifiying differing id numbers for
them.
+
"""
- def __init__(self, dbpath, chunksize=None):
+ def __init__(self, dbpath, chunksize=None, id='1'):
self.dbpath = dbpath
self.db = None
self.writable = False
+ self.id = id
generic.KeyValueStoreCacheManager.__init__(self, chunksize)
def __getitem__(self, key):
@@ -186,3 +194,27 @@
finally:
invdb.close()
+
+encode = lambda x: json.dumps(x, 2)
+decode = json.loads
+
+def get_caches(conn):
+ """Get details of all the caches applied to a connection.
+
+ """
+ caches_meta = conn._index.get_metadata('caches')
+ return decode(caches_meta) if caches_meta else {}
+
+def set_caches(iconn):
+ """Set details of all the caches applied to a connection.
+
+ """
+ iconn._index.set_metadata('caches', encode(iconn._caches))
+
+BASE_CACHE_SLOT = 10000
+
+def cache_manager_slot_start(conn):
+ if not hasattr(conn, '_caches'):
+ conn._caches = get_caches(conn)
+ cache_specific_slot = conn._caches.get(conn.cache_manager.id, 0)
+ return BASE_CACHE_SLOT + cache_specific_slot
=======================================
--- /trunk/xappy/indexerconnection.py Fri Nov 5 06:14:52 2010
+++ /trunk/xappy/indexerconnection.py Wed Mar 16 06:37:58 2011
@@ -1,5 +1,6 @@
# Copyright (C) 2007,2008,2009 Lemur Consulting Ltd
# Copyright (C) 2009 Richard Boulton
+# Copyright (C) 2011 Bruno Rezende
#
# Permission is hereby granted, free of charge, to any person obtaining a
copy
# of this software and associated documentation files (the "Software"), to
deal
@@ -29,6 +30,9 @@
import cachemanager
from datastructures import *
+from cachemanager.xapian_manager import BASE_CACHE_SLOT, \
+ cache_manager_slot_start, get_caches, set_caches
+
import errors
from fieldactions import ActionContext, FieldActions, ActionSet
import fieldmappings
@@ -58,10 +62,6 @@
_index = None
- # Slots after this number are used for the cache manager.
- # FIXME - don't hard-code this - put it in the settings instead?
- _cache_manager_slot_start = 10000
-
# Maximum number of hits ever stored in a cache for a single query.
# This is just used to calculate an appropriate value to store for the
# weight for this item.
@@ -122,6 +122,11 @@
self._mem_buffered = 0
self.set_max_mem_use()
+ # Slots after this number are used for the cache manager.
+ @property
+ def _cache_manager_slot_start(self):
+ return cache_manager_slot_start(self)
+
def __del__(self):
self.close()
@@ -413,18 +418,7 @@
xapdoc = document.prepare()
if self._index.get_metadata('_xappy_hascache'):
- if store_only:
- # Remove any cached items from the cache - the document is
no
- # longer wanted in search results.
- self._remove_cached_items(id, xapid)
- else:
- # Copy any cached query items over to the new document.
- olddoc, olddocid = self._get_xapdoc(id, xapid)
- if olddoc is not None:
- for value in olddoc.values():
- if value.num < self._cache_manager_slot_start:
- continue
- xapdoc.add_value(value.num, value.value)
+ self._replace_cached_item(xapdoc, id, xapid, store_only)
if xapid is None:
self._index.replace_document('Q' + id, xapdoc)
@@ -436,6 +430,20 @@
if self._mem_buffered > self._max_mem:
self.flush()
+ def _replace_cached_item(self, newdoc, id, xapid, store_only):
+ if store_only:
+ # Remove any cached items from the cache - the document is no
+ # longer wanted in search results.
+ self._remove_cached_items(id, xapid)
+ else:
+ # Copy any cached query items over to the new document.
+ olddoc, _ = self._get_xapdoc(id, xapid)
+ if olddoc is not None:
+ for value in olddoc.values():
+ if value.num < BASE_CACHE_SLOT:
+ continue
+ newdoc.add_value(value.num, value.value)
+
def _make_synonym_key(self, original, field):
"""Make a synonym key (ie, the term or group of terms to store in
xapian).
@@ -670,8 +678,17 @@
return
#print "Removing docid=%d" % xapid
+ # FIXME: this will only remove the hits from the set cache
+ # manager, if we have multiple applied caches, the others won't be
+ # updated. This means that currently, if multiple caches are
applied
+ # and document removals happen, some of the caches will get out of
+ # date; multiple caches are therefore not really suitable for use
in
+ # production systems - they are however useful for experimenting
with
+ # different caching algorithms.
for value in doc.values():
- if value.num < self._cache_manager_slot_start:
+ base_slot = self._cache_manager_slot_start
+ upper_slot = self._cache_manager_slot_start +
self.cache_manager.num_cached_queries()
+ if not (base_slot <= value.num < upper_slot):
continue
rank = int(self._cache_manager_max_hits -
xapian.sortable_unserialise(value.value))
@@ -679,7 +696,7 @@
value.num - self._cache_manager_slot_start,
((rank, xapid),))
- def delete(self, id=None, xapid=None):
+ def delete(self, id=None, xapid=None, ignore_cache=False):
"""Delete a document from the search engine index.
If the id does not already exist in the database, this method
@@ -689,12 +706,15 @@
the Xapian document ID to delete. In this case, the Xappy
document ID
will be not be checked.
+ If `ignore_cache` is set to True, items will not be removed from
any
+ caches which are applied to the system.
+
"""
if self._index is None:
raise errors.IndexerError("IndexerConnection has been closed")
# Remove any cached items from the cache.
- if self._index.get_metadata('_xappy_hascache'):
+ if not ignore_cache and
self._index.get_metadata('_xappy_hascache'):
self._remove_cached_items(id, xapid)
# Now, remove the actual document.
@@ -734,6 +754,18 @@
raise RuntimeError("Need to set a cache manager before
calling "
"apply_cached_items()")
+ self._caches = get_caches(self)
+
+ cache_id = self.cache_manager.id
+ num_cached_queries = self.cache_manager.num_cached_queries()
+ num_cache_slots = int(self._index.get_metadata('num_cache_slots')
or '0')
+ num_cache_slots += num_cached_queries
+ self._index.set_metadata('num_cache_slots', str(num_cache_slots))
+
+ if cache_id not in self._caches:
+ self._caches[cache_id] = num_cache_slots
+ set_caches(self)
+
# Remember that a cache manager has been applied in the metadata,
so
# errors can be raised if it's not set during future modifications.
self._index.set_metadata('_xappy_hascache', '1')
=======================================
--- /trunk/xappy/searchconnection.py Mon Feb 22 16:36:53 2010
+++ /trunk/xappy/searchconnection.py Wed Mar 16 06:37:58 2011
@@ -1,6 +1,7 @@
# Copyright (C) 2007,2008,2009 Lemur Consulting Ltd
# Copyright (C) 2009 Pablo Hoffman
# Copyright (C) 2009 Richard Boulton
+# Copyright (C) 2011 Bruno Rezende
#
# Permission is hereby granted, free of charge, to any person obtaining a
copy
# of this software and associated documentation files (the "Software"), to
deal
@@ -34,6 +35,7 @@
import xapian
from cache_search_results import CacheResultOrdering
import cachemanager
+from cachemanager.xapian_manager import cache_manager_slot_start
from datastructures import UnprocessedDocument, ProcessedDocument
from fieldactions import ActionContext, FieldActions, \
ActionSet, SortableMarshaller, convert_range_to_term, \
@@ -80,10 +82,6 @@
_index = None
- # Slots after this number are used for the cache manager.
- # FIXME - don't hard-code this - put it in the settings instead?
- _cache_manager_slot_start = 10000
-
def __init__(self, indexpath):
"""Create a new connection to the index for searching.
@@ -107,6 +105,11 @@
self._index = None
raise
self._imgterms_cache = {}
+
+ # Slots after this number are used for the cache manager.
+ @property
+ def _cache_manager_slot_start(self):
+ return cache_manager_slot_start(self)
def __del__(self):
self.close()