another soldapprovider.py : including user autocreate and group management

11 views
Skip to first unread message

aspineux

unread,
Jan 29, 2007, 4:30:11 AM1/29/07
to TurboGears
Hello I wrote yet anoter version of solapprovier.py

The originality is to duplicate LDAP data into 'local' tables, just
for identity usage.

- This is a sample, customized for my need and freshly written, you
will have to modify the code for your own use !
- This one create new user at first login
- This one search for groups defined into LDAP and create groups into
SQLObject
- Groups and user data are refreshed at every new loggon
- I don't use User, Groups, Permission for anything else than for
Identity module
- I upgraded field User.user_name to 255 (default 16) to contains
email address
- I need to delete unneeded users (any expired connections) somewhere,
at bootup, but also in a scheduled way.

What is fucking missing into TG is a template or a description of what
is required to write its own s[oa]provider.py.
For example to make a in memory version !

any comment is welcome

import ldap, ldapurl

import soprovider
import sqlobject
import turbogears, cherrypy

import logging

class log:
default=logging.getLogger('emailgency.system.soldapprovider')
system=logging.getLogger('emailgency.system')


class SoLdapIdentityProvider(soprovider.SqlObjectIdentityProvider):
"""
IdentityProvider that uses LDAP for authentication.
"""

def __init__(self):
super(SoLdapIdentityProvider, self).__init__()
get = turbogears.config.get

self.host = get("identity.soldapprovider.host", "localhost")
self.port = get("identity.soldapprovider.port", 389)
self.basedn = get("identity.soldapprovider.basedn",
"dc=localhost")
self.login = get("identity.soldapprovider.login",
"cn=nobody,cn=internal,"+self.basedn)
self.password = get("identity.soldapprovider.password", "")
self.autocreate = get("identity.soldapprovider.autocreate",
False)
# self.filter = get("identity.soldapprovider.filter", "(mail=%
(username)s)")

log.default.info("host :: %s" % self.host)
log.default.info("port :: %d" % self.port)
log.default.info("basedn :: %s" % self.basedn)
log.default.info("login :: %s" % self.login)
log.default.info("password :: %s" % "********")
log.default.info("autocreate :: %s" % self.autocreate)
# log.default.info("filter :: %s" % self.filter)

self.ldap_url=ldapurl.LDAPUrl('ldap://%s:%d/%s' % (self.host,
self.port, self.basedn))
self.ldap_url.applyDefaults({ 'who': self.login, 'cred' :
self.password, })
self.ldap_con=None

def validate_identity( self, user_name, password, visit_key ):
'''
Look up the identity represented by user_name and determine
whether the
password is correct.

Must return either None if the credentials weren't valid or an
object
with the following properties:
user_name: original user name
user: a provider dependant object (TG_User or similar)
groups: a set of group IDs
permissions: a set of permission IDs
'''
# if not self.autocreate:
# return super(SoLdapIdentityProvider,
self).validate_identity(user_name, password, visit_key )

user_name=soprovider.to_db_encoding( user_name,
self.user_class_db_encoding )
user=None
group_lst=self.validate_password(user, user_name, password)
if not group_lst:
log.default.info( "user '%s' or password invalid",
user_name )
return None

try:
user=soprovider.user_class.by_user_name( user_name )
except sqlobject.SQLObjectNotFound:
try:
user=soprovider.user_class(user_name=user_name,
email_address=user_name, display_name=user_name, password='ldap')
except:
log.default.error( "Creating user: %s", user_name )
return None
else:
log.default.info( "user created: %s", user_name )

for gr in user.groups: user.removeGroup(gr)
for gr_name in group_lst:
user.addGroup(soprovider.group_class.by_group_name(gr_name))

log.default.info( 'user "%s"in groups %r', user_name,
group_lst )

# Link the user to the visit
try:
link=soprovider.visit_class.by_visit_key( visit_key )
link.user_id= user.id
except sqlobject.SQLObjectNotFound:
link= soprovider.visit_class( visit_key=visit_key,
user_id=user.id )
return soprovider.SqlObjectIdentity( visit_key, user )

def validate_password( self, user, user_name, password ):
""" Validates user_name and password against an LDAP
directory
and retrieve information about groups and permissions"""

if not self.ldap_con:
try:

self.ldap_con=ldap.ldapobject.ReconnectLDAPObject(self.ldap_url.initial
izeUrl())
self.ldap_con.simple_bind_s(self.ldap_url.who,
self.ldap_url.cred)
except ldap.INVALID_CREDENTIALS:
log.error("invalid ldap credential for %s" %
self.ldap_url.who)
self.ldap_con=None
raise
except ldap.LDAPError:
log.exception('cannot initialize primary ldap
connection: %s', self.ldap_url.initializeUrl())
self.ldap_con=None
raise

filter="(&(uid=%(username)s)(objectclass=kolabInetOrgPerson))"
% { 'username' :user_name } # uid because dom. maintainer doesn't have
any email address
try:
objects=self.ldap_con.search_s(self.basedn,
ldap.SCOPE_SUBTREE, filter)
except ldap.LDAPError:
log.exception('searching ldap for %s', filter)
self.ldap_con=None
raise

if len(objects)==0:
log.warning("No such LDAP user: %s" % user_name)
return False
elif len(objects)>1:
log.error("Too many users: %s" % user_name)
return False

# open a ldap connection as user_name with provided
credentials
dn=objects[0][0]
try:
ldap_url=ldapurl.LDAPUrl('ldap://%s:%d/%s' % (self.host,
self.port, self.basedn))
ldap_url.applyDefaults({ 'who': dn, 'cred' : password, })

ldap_con=ldap.ldapobject.ReconnectLDAPObject(ldap_url.initializeUrl())
ldap_con.simple_bind_s(ldap_url.who, ldap_url.cred)
except ldap.INVALID_CREDENTIALS:
log.error("Invalid password supplied for %s" % user_name)
return False
except ldap.LDAPError:
log.exception('cannot initialize ldap connection with %s',
self.ldap_url.initializeUrl())
raise

# keep connection for further use by user
cherrypy.session['ldap_url']=ldap_url
cherrypy.session['ldap_con']=ldap_con

# Now retriev information about groups and permissions

try:
filter='(member=%s)' % dn
rc=self.ldap_con.search_s(self.ldap_url.dn,
ldap.SCOPE_SUBTREE, filter, ['dn'])
except ldap.LDAPError:
log.error("Error searching groups for %s" % user_name)
self.ldap_con=None
raise

# group is a type and then can contain only one element
group=None
for gr, gr_dn in [ ('admin',
'cn=admin,cn=internal,dc=asxnet,dc=loc') ,
( 'admin',
'cn=maintainer,cn=internal,dc=asxnet,dc=loc') ,
('domain-admin', 'cn=domain-
maintainer,cn=internal,dc=asxnet,dc=loc') ]:
gr_dn=ldap.explode_dn(gr_dn)
for dn, o in rc:
if ldap.explode_dn(dn)==gr_dn:
group=[ gr ]
break
if group:
break

if not group: group=['user']

return group

aspineux

unread,
Jan 29, 2007, 4:53:53 AM1/29/07
to TurboGears
To be complete.

The code in my previous post is store into /usr/lib/python2.4/site-
packages/TurboGears-1.0-py2.4.egg/turbogears/identity/
soldapprovider.py

I modified a section into /usr/lib/python2.4/site-packages/
TurboGears-1.0-py2.4.egg/EGG-INFO/entry_points.txt

[turbogears.identity.provider]
soldapprovider =
turbogears.identity.soldapprovider:SoLdapIdentityProvider

and

myproject/config/app.cfg

identity.soldapprovider.host = "localhost"
identity.soldapprovider.port = 389
identity.soldapprovider.basedn = "dc=asxnet,dc=loc"
identity.soldapprovider.login =
"cn=nobody,cn=internal,dc=asxnet,dc=loc"
identity.soldapprovider.password="**************"
identity.soldapprovider.autocreate = False

Jorge Vargas

unread,
Jan 30, 2007, 11:33:14 PM1/30/07
to turbo...@googlegroups.com
may I suggest you refractor this into an identity plugin, if not then at least post it on docs.turbogears.org

>             user=soprovider.user_class.by_user_name ( user_name )

>         except sqlobject.SQLObjectNotFound:
>             try:
>                 user=soprovider.user_class(user_name=user_name,
> email_address=user_name, display_name=user_name, password='ldap')
>             except:
>                 log.default.error( "Creating user: %s", user_name )
>                 return None
>             else:
>                 log.default.info( "user created: %s", user_name )
>
>         for gr in user.groups: user.removeGroup(gr)
>         for gr_name in group_lst:
> user.addGroup(soprovider.group_class.by_group_name(gr_name))
>
>         log.default.info( 'user "%s"in groups %r', user_name,
> group_lst )
>
>         # Link the user to the visit
>         try:
>             link=soprovider.visit_class.by_visit_key( visit_key )
>             link.user_id= user.id
>         except sqlobject.SQLObjectNotFound:
>             link= soprovider.visit_class( visit_key=visit_key,
> user_id=user.id )
>         return soprovider.SqlObjectIdentity ( visit_key, user )
Reply all
Reply to author
Forward
0 new messages