The originality is to duplicate LDAP data into 'local' tables, just
for identity usage.
- This is a sample, customized for my need and freshly written, you
will have to modify the code for your own use !
- This one create new user at first login
- This one search for groups defined into LDAP and create groups into
SQLObject
- Groups and user data are refreshed at every new loggon
- I don't use User, Groups, Permission for anything else than for
Identity module
- I upgraded field User.user_name to 255 (default 16) to contains
email address
- I need to delete unneeded users (any expired connections) somewhere,
at bootup, but also in a scheduled way.
What is fucking missing into TG is a template or a description of what
is required to write its own s[oa]provider.py.
For example to make a in memory version !
any comment is welcome
import ldap, ldapurl
import soprovider
import sqlobject
import turbogears, cherrypy
import logging
class log:
default=logging.getLogger('emailgency.system.soldapprovider')
system=logging.getLogger('emailgency.system')
class SoLdapIdentityProvider(soprovider.SqlObjectIdentityProvider):
"""
IdentityProvider that uses LDAP for authentication.
"""
def __init__(self):
super(SoLdapIdentityProvider, self).__init__()
get = turbogears.config.get
self.host = get("identity.soldapprovider.host", "localhost")
self.port = get("identity.soldapprovider.port", 389)
self.basedn = get("identity.soldapprovider.basedn",
"dc=localhost")
self.login = get("identity.soldapprovider.login",
"cn=nobody,cn=internal,"+self.basedn)
self.password = get("identity.soldapprovider.password", "")
self.autocreate = get("identity.soldapprovider.autocreate",
False)
# self.filter = get("identity.soldapprovider.filter", "(mail=%
(username)s)")
log.default.info("host :: %s" % self.host)
log.default.info("port :: %d" % self.port)
log.default.info("basedn :: %s" % self.basedn)
log.default.info("login :: %s" % self.login)
log.default.info("password :: %s" % "********")
log.default.info("autocreate :: %s" % self.autocreate)
# log.default.info("filter :: %s" % self.filter)
self.ldap_url=ldapurl.LDAPUrl('ldap://%s:%d/%s' % (self.host,
self.port, self.basedn))
self.ldap_url.applyDefaults({ 'who': self.login, 'cred' :
self.password, })
self.ldap_con=None
def validate_identity( self, user_name, password, visit_key ):
'''
Look up the identity represented by user_name and determine
whether the
password is correct.
Must return either None if the credentials weren't valid or an
object
with the following properties:
user_name: original user name
user: a provider dependant object (TG_User or similar)
groups: a set of group IDs
permissions: a set of permission IDs
'''
# if not self.autocreate:
# return super(SoLdapIdentityProvider,
self).validate_identity(user_name, password, visit_key )
user_name=soprovider.to_db_encoding( user_name,
self.user_class_db_encoding )
user=None
group_lst=self.validate_password(user, user_name, password)
if not group_lst:
log.default.info( "user '%s' or password invalid",
user_name )
return None
try:
user=soprovider.user_class.by_user_name( user_name )
except sqlobject.SQLObjectNotFound:
try:
user=soprovider.user_class(user_name=user_name,
email_address=user_name, display_name=user_name, password='ldap')
except:
log.default.error( "Creating user: %s", user_name )
return None
else:
log.default.info( "user created: %s", user_name )
for gr in user.groups: user.removeGroup(gr)
for gr_name in group_lst:
user.addGroup(soprovider.group_class.by_group_name(gr_name))
log.default.info( 'user "%s"in groups %r', user_name,
group_lst )
# Link the user to the visit
try:
link=soprovider.visit_class.by_visit_key( visit_key )
link.user_id= user.id
except sqlobject.SQLObjectNotFound:
link= soprovider.visit_class( visit_key=visit_key,
user_id=user.id )
return soprovider.SqlObjectIdentity( visit_key, user )
def validate_password( self, user, user_name, password ):
""" Validates user_name and password against an LDAP
directory
and retrieve information about groups and permissions"""
if not self.ldap_con:
try:
self.ldap_con=ldap.ldapobject.ReconnectLDAPObject(self.ldap_url.initial
izeUrl())
self.ldap_con.simple_bind_s(self.ldap_url.who,
self.ldap_url.cred)
except ldap.INVALID_CREDENTIALS:
log.error("invalid ldap credential for %s" %
self.ldap_url.who)
self.ldap_con=None
raise
except ldap.LDAPError:
log.exception('cannot initialize primary ldap
connection: %s', self.ldap_url.initializeUrl())
self.ldap_con=None
raise
filter="(&(uid=%(username)s)(objectclass=kolabInetOrgPerson))"
% { 'username' :user_name } # uid because dom. maintainer doesn't have
any email address
try:
objects=self.ldap_con.search_s(self.basedn,
ldap.SCOPE_SUBTREE, filter)
except ldap.LDAPError:
log.exception('searching ldap for %s', filter)
self.ldap_con=None
raise
if len(objects)==0:
log.warning("No such LDAP user: %s" % user_name)
return False
elif len(objects)>1:
log.error("Too many users: %s" % user_name)
return False
# open a ldap connection as user_name with provided
credentials
dn=objects[0][0]
try:
ldap_url=ldapurl.LDAPUrl('ldap://%s:%d/%s' % (self.host,
self.port, self.basedn))
ldap_url.applyDefaults({ 'who': dn, 'cred' : password, })
ldap_con=ldap.ldapobject.ReconnectLDAPObject(ldap_url.initializeUrl())
ldap_con.simple_bind_s(ldap_url.who, ldap_url.cred)
except ldap.INVALID_CREDENTIALS:
log.error("Invalid password supplied for %s" % user_name)
return False
except ldap.LDAPError:
log.exception('cannot initialize ldap connection with %s',
self.ldap_url.initializeUrl())
raise
# keep connection for further use by user
cherrypy.session['ldap_url']=ldap_url
cherrypy.session['ldap_con']=ldap_con
# Now retriev information about groups and permissions
try:
filter='(member=%s)' % dn
rc=self.ldap_con.search_s(self.ldap_url.dn,
ldap.SCOPE_SUBTREE, filter, ['dn'])
except ldap.LDAPError:
log.error("Error searching groups for %s" % user_name)
self.ldap_con=None
raise
# group is a type and then can contain only one element
group=None
for gr, gr_dn in [ ('admin',
'cn=admin,cn=internal,dc=asxnet,dc=loc') ,
( 'admin',
'cn=maintainer,cn=internal,dc=asxnet,dc=loc') ,
('domain-admin', 'cn=domain-
maintainer,cn=internal,dc=asxnet,dc=loc') ]:
gr_dn=ldap.explode_dn(gr_dn)
for dn, o in rc:
if ldap.explode_dn(dn)==gr_dn:
group=[ gr ]
break
if group:
break
if not group: group=['user']
return group
The code in my previous post is store into /usr/lib/python2.4/site-
packages/TurboGears-1.0-py2.4.egg/turbogears/identity/
soldapprovider.py
I modified a section into /usr/lib/python2.4/site-packages/
TurboGears-1.0-py2.4.egg/EGG-INFO/entry_points.txt
[turbogears.identity.provider]
soldapprovider =
turbogears.identity.soldapprovider:SoLdapIdentityProvider
and
myproject/config/app.cfg
identity.soldapprovider.host = "localhost"
identity.soldapprovider.port = 389
identity.soldapprovider.basedn = "dc=asxnet,dc=loc"
identity.soldapprovider.login =
"cn=nobody,cn=internal,dc=asxnet,dc=loc"
identity.soldapprovider.password="**************"
identity.soldapprovider.autocreate = False
> user=soprovider.user_class.by_user_name ( user_name )
> except sqlobject.SQLObjectNotFound:
> try:
> user=soprovider.user_class(user_name=user_name,
> email_address=user_name, display_name=user_name, password='ldap')
> except:
> log.default.error( "Creating user: %s", user_name )
> return None
> else:
> log.default.info( "user created: %s", user_name )
>
> for gr in user.groups: user.removeGroup(gr)
> for gr_name in group_lst:
> user.addGroup(soprovider.group_class.by_group_name(gr_name))
>
> log.default.info( 'user "%s"in groups %r', user_name,
> group_lst )
>
> # Link the user to the visit
> try:
> link=soprovider.visit_class.by_visit_key( visit_key )
> link.user_id= user.id
> except sqlobject.SQLObjectNotFound:
> link= soprovider.visit_class( visit_key=visit_key,
> user_id=user.id )
> return soprovider.SqlObjectIdentity ( visit_key, user )