[omapd] r185 committed - added subscription index

1 view
Skip to first unread message

om...@googlecode.com

unread,
Jun 5, 2013, 12:19:18 PM6/5/13
to omapd...@googlegroups.com
Revision: 185
Author: fu...@asguardnetworks.com
Date: Wed Jun 5 09:18:54 2013
Log: added subscription index
http://code.google.com/p/omapd/source/detail?r=185

Modified:
/branches/lfu/clienthandler.cpp
/branches/lfu/mapsessions.cpp
/branches/lfu/mapsessions.h
/branches/lfu/subscription.cpp
/branches/lfu/subscription.h

=======================================
--- /branches/lfu/clienthandler.cpp Wed May 29 14:07:38 2013
+++ /branches/lfu/clienthandler.cpp Wed Jun 5 09:18:54 2013
@@ -993,8 +993,16 @@
Subscription* sub = new Subscription(subReq.requestVersion());
sub->_name = subOper.name();
sub->_search = subOper.search();
+ sub->_authToken = _authToken;
int currentDepth = -1;
buildSearchGraph(*sub, sub->_search.startId(), currentDepth);
+
+ // add all identifiers to the index.
+ const QSet<Id> sub_ids(sub->identifiers());
+ QSetIterator<Id> setIt(sub_ids);
+ while(setIt.hasNext()) {
+ MapSessions::getInstance()->addToIndex(setIt.next(), sub);
+ }

if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
qDebug() << __PRETTY_FUNCTION__ << ":" << "Subscription:"
<< subOper.name();
@@ -1079,6 +1087,8 @@

if (!requestError) {
Subscription tempSub(searchReq.requestVersion());
+ // don't include this subscription in the subscription index:
+ tempSub._indexed = false;
tempSub._search = searchReq.search();

int currentDepth = -1;
@@ -1700,174 +1710,190 @@
// new link could link two separate sub-graphs together or a
deleted link
// could prune a graph into two separate sub-graphs.

- QHash<QString, QList<Subscription*> >
allSubLists(_mapSessions->subscriptionLists());
- QMutableHashIterator<QString,QList<Subscription*> >
allSubsIt(allSubLists);
+ // QHash<QString, QList<Subscription*> >
allSubLists(_mapSessions->subscriptionLists());
+ // LFu - only consider subscriptions that include the new/deleted
Id(s):
+ QSet<Subscription*> allSubLists =
MapSessions::getInstance()->getSubscriptionsForIdentifier(link.first);
+ if (isLink)
+ allSubLists +=
MapSessions::getInstance()->getSubscriptionsForIdentifier(link.second);
+
+ // QMutableHashIterator<QString,QList<Subscription*> >
allSubsIt(allSubLists);
+ QMutableSetIterator<Subscription*> allSubsIt(allSubLists);

while (allSubsIt.hasNext()) {
- allSubsIt.next();
- const QString& authToken = allSubsIt.key();
+ Subscription* sub = allSubsIt.next();
+ const QString& authToken = sub->_authToken;
QString pubId = _mapSessions->pubIdForAuthToken(authToken);
- QList<Subscription*>& subList = allSubsIt.value();
+ // QList<Subscription*>& subList = allSubsIt.value();
+ // if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ // qDebug() << __PRETTY_FUNCTION__ << ":" << "publisher:" <<
pubId << "has num subscriptions:" << subList.size();
+ // }
+
+ // bool publisherHasDirtySub = false;
+ // QMutableListIterator<Subscription*> subIt(subList);
+ // while (subIt.hasNext()) {
+
if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":" << "publisher:" <<
pubId << "has num subscriptions:" << subList.size();
+ qDebug() << __PRETTY_FUNCTION__ << ":" << "--checking
subscription named:" << sub->_name;
}

- // bool publisherHasDirtySub = false;
- QMutableListIterator<Subscription*> subIt(subList);
- while (subIt.hasNext()) {
- Subscription* sub = subIt.next();
- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":" << "--checking
subscription named:" << sub->_name;
- }
+ QSet<Id> idsWithConnectedGraphUpdates,
idsWithConnectedGraphDeletes;
+ QSet<Link> linksWithConnectedGraphUpdates,
linksWithConnectedGraphDeletes;
+ bool modifiedSearchGraph = false;
+ bool subIsDirty = false;

- QSet<Id> idsWithConnectedGraphUpdates,
idsWithConnectedGraphDeletes;
- QSet<Link> linksWithConnectedGraphUpdates,
linksWithConnectedGraphDeletes;
- bool modifiedSearchGraph = false;
- bool subIsDirty = false;
-
- if (! isLink) {
- if (sub->_ids.contains(link.first)) {
- // Case 1.
- subIsDirty = true;
- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----subscription is dirty with id in SearchGraph:" << link.first;
- }
+ if (! isLink) {
+ if (sub->_ids.contains(link.first)) {
+ // Case 1.
+ subIsDirty = true;
+ if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----subscription is dirty with id in SearchGraph:" << link.first;
}
- } else {
+ }
+ } else {

- bool isLinkInSub = sub->_linkList.contains(link);
- bool oneIdInSub = sub->_ids.contains(link.first) ||
sub->_ids.contains(link.second);
+ bool isLinkInSub = sub->_linkList.contains(link);
+ bool oneIdInSub = sub->_ids.contains(link.first) ||
sub->_ids.contains(link.second);

- if (isLinkInSub && publishType == Meta::PublishDelete) {
- // Case 3.
- subIsDirty = true;
- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----subscription is dirty with link in SearchGraph:" << link;
- }
+ if (isLinkInSub && publishType == Meta::PublishDelete) {
+ // Case 3.
+ subIsDirty = true;
+ if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----subscription is dirty with link in SearchGraph:" << link;
}
+ }

- if (isLinkInSub && publishType == Meta::PublishUpdate) {
- // Case 2.
- subIsDirty = true;
- } else if(subIsDirty || (oneIdInSub && publishType ==
Meta::PublishUpdate)) {
- // (LFu) - the way this else was written before, it
also covered the case:
- // !sub._linkList.contains(link) && publishType ==
Meta::PublishDelete, which we should ignore
+ if (isLinkInSub && publishType == Meta::PublishUpdate) {
+ // Case 2.
+ subIsDirty = true;
+ } else if(subIsDirty || (oneIdInSub && publishType ==
Meta::PublishUpdate)) {
+ // (LFu) - the way this else was written before, it also
covered the case:
+ // !sub._linkList.contains(link) && publishType ==
Meta::PublishDelete, which we should ignore

- // Case 4. (and search graph rebuild for case 3)
+ // Case 4. (and search graph rebuild for case 3)

- // (LFu) Changes:
- // - when adding a link that contributes to the
subscription's search graph, extend the search graph
- // rather than rebuilding it completely. This will
cause non-matching links or links attached to an
- // identifier at max search depth to become noops
+ // (LFu) Changes:
+ // - when adding a link that contributes to the
subscription's search graph, extend the search graph
+ // rather than rebuilding it completely. This will cause
non-matching links or links attached to an
+ // identifier at max search depth to become noops

- bool needFullRebuild = (publishType ==
Meta::PublishUpdate ? false : true);
+ bool needFullRebuild = (publishType ==
Meta::PublishUpdate ? false : true);

- // TODO: we should also be able to optimize removal of
a link, e.g. any link that ends in an identifier
- // of max subs search depth can be removed in
isolation (i.e. no subs graph must be calculated)
+ // TODO: we should also be able to optimize removal of a
link, e.g. any link that ends in an identifier
+ // of max subs search depth can be removed in isolation
(i.e. no subs graph must be calculated)

- QMap<Id, int> existingIdList = sub->_ids;
- QSet<Link> existingLinkList = sub->_linkList;
- int currentDepth = -1;
- Id startId;
- if(needFullRebuild)
- {
- sub->_ids.clear();
- sub->_linkList.clear();
- startId = sub->_search.startId();
- // qDebug() << "~~making full search graph rebuild
for sub " << sub->_name;
- }
- else
+ QMap<Id, int> existingIdList = sub->_ids;
+ QSet<Link> existingLinkList = sub->_linkList;
+ int currentDepth = -1;
+ Id startId;
+ if(needFullRebuild)
+ {
+ sub->_ids.clear();
+ sub->_linkList.clear();
+ startId = sub->_search.startId();
+ // qDebug() << "~~making full search graph rebuild for
sub " << sub->_name;
+ }
+ else
+ {
+ // determine the identifier at which the search graph
is extended
+ startId = link.first;
+ currentDepth = sub->getDepth(startId);
+ if(-1 == currentDepth)
{
- // determine the identifier at which the search
graph is extended
- startId = link.first;
+ startId = link.second;
currentDepth = sub->getDepth(startId);
- if(-1 == currentDepth)
- {
- startId = link.second;
- currentDepth = sub->getDepth(startId);
- }
- // qDebug() << "~~making search graph exension for
sub " << sub->_name;
}
+ // qDebug() << "~~making search graph exension for
sub " << sub->_name;
+ }

- // qDebug() << "~~making full search graph rebuild for
sub " << sub->_name;
+ // qDebug() << "~~making full search graph rebuild for
sub " << sub->_name;

- // LFu - remove
- // QTime buildSearchGraphTimer;
- // buildSearchGraphTimer.start();
+ // LFu - remove
+ // QTime buildSearchGraphTimer;
+ // buildSearchGraphTimer.start();

- buildSearchGraph(*sub, startId, (needFullRebuild ?
currentDepth : currentDepth - 1));
- // qDebug() << "~~processing buildSearchGraph() took "
<< buildSearchGraphTimer.elapsed() << " ms for sub " << sub->_name;
+ buildSearchGraph(*sub, startId, (needFullRebuild ?
currentDepth : currentDepth - 1));
+ // qDebug() << "~~processing buildSearchGraph() took " <<
buildSearchGraphTimer.elapsed() << " ms for sub " << sub->_name;

- if (sub->_ids != existingIdList) {
- subIsDirty = true;
- modifiedSearchGraph = true;
+ if (sub->_ids != existingIdList) {
+ subIsDirty = true;
+ modifiedSearchGraph = true;

- // subtract pre-existing IDs from recalculated
search graph: Metadata on these ids are in updateResults
- idsWithConnectedGraphUpdates =
sub->identifiersWithout(existingIdList);
+ // subtract pre-existing IDs from recalculated search
graph: Metadata on these ids are in updateResults
+ idsWithConnectedGraphUpdates =
sub->identifiersWithout(existingIdList);
+ // we need to add all these IDs to the index:
+ QSetIterator<Id> addIt(idsWithConnectedGraphUpdates);
+ while(addIt.hasNext()) {
+
MapSessions::getInstance()->addToIndex(addIt.next(), sub);
+ }

- // subtract recalculated search graph IDs from
pre-existing IDs: Metadata on these ids are in deleteResults
- idsWithConnectedGraphDeletes =
sub->subtractFrom(existingIdList);
+ // subtract recalculated search graph IDs from
pre-existing IDs: Metadata on these ids are in deleteResults
+ idsWithConnectedGraphDeletes =
sub->subtractFrom(existingIdList);
+ // we need to remove all these IDs from the index:
+ QSetIterator<Id> delIt(idsWithConnectedGraphDeletes);
+ while(delIt.hasNext()) {
+
MapSessions::getInstance()->addToIndex(delIt.next(), sub);
+ }

- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__
<< ":sub._linkList.contains(link) && " << "----subscription is dirty with
newIdList.size:" << sub->_ids.size();
- }
+ if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ qDebug() << __PRETTY_FUNCTION__
<< ":sub._linkList.contains(link) && " << "----subscription is dirty with
newIdList.size:" << sub->_ids.size();
}
+ }

- if (sub->_linkList != existingLinkList) {
- subIsDirty = true;
- modifiedSearchGraph = true;
- // Metadata on these links are in updateResults
- linksWithConnectedGraphUpdates = sub->_linkList -
existingLinkList;
- // Metadata on these links are in deleteResults
- linksWithConnectedGraphDeletes = existingLinkList
- sub->_linkList;
+ if (sub->_linkList != existingLinkList) {
+ subIsDirty = true;
+ modifiedSearchGraph = true;
+ // Metadata on these links are in updateResults
+ linksWithConnectedGraphUpdates = sub->_linkList -
existingLinkList;
+ // Metadata on these links are in deleteResults
+ linksWithConnectedGraphDeletes = existingLinkList -
sub->_linkList;

- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----subscription is dirty with newLinkList.size:" <<
sub->_linkList.size();
- }
+ if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----subscription is dirty with newLinkList.size:" <<
sub->_linkList.size();
}
}
}
+ }

- if (subIsDirty && !sub->_subscriptionError) {
- // Construct results for the subscription
- if (sub->_requestVersion == MapRequest::IFMAPv11) {
- // Trigger to build and send pollResults
- sub->_sentFirstResult = false;
- } else if (sub->_sentFirstResult && sub->_requestVersion
== MapRequest::IFMAPv20) {
- MapRequest::RequestError error = MapRequest::ErrorNone;
- // Add results from
publish/delete/endSessipublisherHasDirtySubon/purgePublisher (that don't
modify SearchGraph)
- if (!modifiedSearchGraph || publishType ==
Meta::PublishDelete) {
- SearchResult::ResultType resultType =
SearchResult::resultTypeForPublishType(publishType);
- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----adding update/delete results from un-changed SearchGraph";
- }
- if (isLink) {
- addLinkResult(*sub, link, metaChanges,
resultType, error);
- } else {
- addIdentifierResult(*sub, link.first,
metaChanges, resultType, error);
- }
+ if (subIsDirty && !sub->_subscriptionError) {
+ // Construct results for the subscription
+ if (sub->_requestVersion == MapRequest::IFMAPv11) {
+ // Trigger to build and send pollResults
+ sub->_sentFirstResult = false;
+ } else if (sub->_sentFirstResult && sub->_requestVersion ==
MapRequest::IFMAPv20) {
+ MapRequest::RequestError error = MapRequest::ErrorNone;
+ // Add results from
publish/delete/endSessipublisherHasDirtySubon/purgePublisher (that don't
modify SearchGraph)
+ if (!modifiedSearchGraph || publishType ==
Meta::PublishDelete) {
+ SearchResult::ResultType resultType =
SearchResult::resultTypeForPublishType(publishType);
+ if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----adding update/delete results from un-changed SearchGraph";
}
- // Add results from extending SearchGraph for this
subscription
- if (!idsWithConnectedGraphUpdates.isEmpty() |
| !linksWithConnectedGraphUpdates.isEmpty()) {
- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----adding updateResults from changed SearchGraph";
- }
- addUpdateAndDeleteMetadata(*sub,
SearchResult::UpdateResultType, idsWithConnectedGraphUpdates,
linksWithConnectedGraphUpdates, error);
+ if (isLink) {
+ addLinkResult(*sub, link, metaChanges, resultType,
error);
+ } else {
+ addIdentifierResult(*sub, link.first, metaChanges,
resultType, error);
}
- // Add results from pruning SearchGraph for this
subscription
- if (!idsWithConnectedGraphDeletes.isEmpty() |
| !linksWithConnectedGraphDeletes.isEmpty()) {
- if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
- qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----adding deleteResults from changed SearchGraph";
- }
- addUpdateAndDeleteMetadata(*sub,
SearchResult::DeleteResultType, idsWithConnectedGraphDeletes,
linksWithConnectedGraphDeletes, error);
+ }
+ // Add results from extending SearchGraph for this
subscription
+ if (!idsWithConnectedGraphUpdates.isEmpty() |
| !linksWithConnectedGraphUpdates.isEmpty()) {
+ if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----adding updateResults from changed SearchGraph";
}
+ addUpdateAndDeleteMetadata(*sub,
SearchResult::UpdateResultType, idsWithConnectedGraphUpdates,
linksWithConnectedGraphUpdates, error);
}
-
- // LFu: not needed any longer:
- // subIt.setValue(sub);
- // publisherHasDirtySub = true;
+ // Add results from pruning SearchGraph for this
subscription
+ if (!idsWithConnectedGraphDeletes.isEmpty() |
| !linksWithConnectedGraphDeletes.isEmpty()) {
+ if
(_omapdConfig->valueFor("debug_level").value<OmapdConfig::IfmapDebugOptions>().testFlag(OmapdConfig::ShowSearchAlgorithm))
{
+ qDebug() << __PRETTY_FUNCTION__ << ":"
<< "----adding deleteResults from changed SearchGraph";
+ }
+ addUpdateAndDeleteMetadata(*sub,
SearchResult::DeleteResultType, idsWithConnectedGraphDeletes,
linksWithConnectedGraphDeletes, error);
+ }
}
+
+ // LFu: not needed any longer:
+ // subIt.setValue(sub);
+ // publisherHasDirtySub = true;
}
+ // }


// if (publisherHasDirtySub) {
=======================================
--- /branches/lfu/mapsessions.cpp Tue May 28 13:50:58 2013
+++ /branches/lfu/mapsessions.cpp Wed Jun 5 09:18:54 2013
@@ -718,3 +718,18 @@

return isValid;
}
+
+QSet<Subscription*> MapSessions::getSubscriptionsForIdentifier(const Id&
id)
+{
+ return _subs[id];
+}
+
+void MapSessions::addToIndex(const Id& id, Subscription* sub)
+{
+ _subs[id].insert(sub);
+}
+
+void MapSessions::removeFromIndex(const Id& id, Subscription* sub)
+{
+ _subs[id].remove(sub);
+}
=======================================
--- /branches/lfu/mapsessions.h Tue May 28 13:50:58 2013
+++ /branches/lfu/mapsessions.h Wed Jun 5 09:18:54 2013
@@ -84,6 +84,12 @@
void removeMapClient(const QString& authToken);
void removeMapClientCA(const QString& authToken);

+ QSet<Subscription*> getSubscriptionsForIdentifier(const Id& id); //
return list of all subscriptions that
+ //
include id in their search graph
+ void addToIndex(const Id& id, Subscription* sub);
+ void removeFromIndex(const Id& id, Subscription* sub);
+
+
private:
MapSessions(QObject *parent = 0);
~MapSessions();
@@ -95,6 +101,7 @@

QHash<QString, MapClient*> _mapClients; // authToken --> MapClient
QHash<QString, MapClient*> _mapClientCAs; // CA AuthToken --> MapClient
+ QHash<Id, QSet<Subscription*> > _subs; // Identifier --> list of
referenced subscriptions
// Registry for published vendor specific metadata cardinalities
QHash<VSM, Meta::Cardinality> _vsmRegistry;

=======================================
--- /branches/lfu/subscription.cpp Fri May 24 23:06:39 2013
+++ /branches/lfu/subscription.cpp Wed Jun 5 09:18:54 2013
@@ -20,6 +20,7 @@
*/

#include "subscription.h"
+#include "mapsessions.h"

SearchResult::ResultType
SearchResult::resultTypeForPublishType(Meta::PublishOperationType
publishType)
{
@@ -53,11 +54,26 @@
_sentFirstResult = false;
_curSize = 0;
_subscriptionError = MapRequest::ErrorNone;
+ _indexed = true;
}

Subscription::~Subscription()
{
// TODO: Do I need to clearSearchResults() to avoid leaking memory?
+
+ // (LFu) You bet:
+ clearSearchResults();
+
+ // also remove this Subscription from the index
+ if(_indexed)
+ {
+ MapSessions* ms = MapSessions::getInstance();
+ QMapIterator<Id, int> mapIt(_ids);
+ while(mapIt.hasNext())
+ {
+ ms->removeFromIndex(mapIt.next().key(), this);
+ }
+ }
}

QSet<Id> Subscription::identifiers() const
=======================================
--- /branches/lfu/subscription.h Fri May 24 23:06:39 2013
+++ /branches/lfu/subscription.h Wed Jun 5 09:18:54 2013
@@ -63,8 +63,11 @@

QString _name;
SearchType _search;
+ bool _indexed; // this is set to false for temporary subscriptions
used in search requests

- QMap<Id, int> _ids;
+ QString _authToken;
+
+ QMap<Id, int> _ids; // id --> search depth
QSet<Id> identifiers() const;
inline void addId(const Id& id, int depth) { if(!_ids.contains(id) ||
_ids[id] > depth) _ids[id] = depth; }
inline int getDepth(const Id& id) const { return (_ids.contains(id) ?
_ids[id] : -1); }
Reply all
Reply to author
Forward
0 new messages