Package ldaptor :: Package test :: Module test_server
[hide private]
[frames] | no frames]

Source Code for Module ldaptor.test.test_server

  1  """ 
  2  Test cases for ldaptor.protocols.ldap.ldapserver module. 
  3  """ 
  4   
  5  from twisted.trial import unittest 
  6  import sets, base64 
  7  from twisted.internet import protocol, address 
  8  from twisted.python import components 
  9  from ldaptor import inmemory, interfaces, schema, delta, entry 
 10  from ldaptor.protocols.ldap import ldapserver, ldapclient, ldaperrors, fetchschema 
 11  from ldaptor.protocols import pureldap, pureber 
 12  from twisted.test import proto_helpers 
 13  from ldaptor.test import util, test_schema 
 14   
15 -class LDAPServerTest(unittest.TestCase):
16 - def setUp(self):
17 self.root = inmemory.ReadOnlyInMemoryLDAPEntry( 18 dn='dc=example,dc=com', 19 attributes={ 'dc': 'example', 20 }) 21 self.stuff = self.root.addChild( 22 rdn='ou=stuff', 23 attributes={ 24 'objectClass': ['a', 'b'], 25 'ou': ['stuff'], 26 }) 27 self.thingie = self.stuff.addChild( 28 rdn='cn=thingie', 29 attributes={ 30 'objectClass': ['a', 'b'], 31 'cn': ['thingie'], 32 }) 33 self.another = self.stuff.addChild( 34 rdn='cn=another', 35 attributes={ 36 'objectClass': ['a', 'b'], 37 'cn': ['another'], 38 }) 39 server = ldapserver.LDAPServer() 40 server.factory = self.root 41 server.transport = proto_helpers.StringTransport() 42 server.connectionMade() 43 self.server = server
44
45 - def test_bind(self):
46 self.server.dataReceived(str(pureldap.LDAPMessage(pureldap.LDAPBindRequest(), id=4))) 47 self.assertEquals(self.server.transport.value(), 48 str(pureldap.LDAPMessage(pureldap.LDAPBindResponse(resultCode=0), id=4)))
49
50 - def test_bind_success(self):
51 self.thingie['userPassword'] = ['{SSHA}yVLLj62rFf3kDAbzwEU0zYAVvbWrze8='] # "secret" 52 self.server.dataReceived(str(pureldap.LDAPMessage(pureldap.LDAPBindRequest( 53 dn='cn=thingie,ou=stuff,dc=example,dc=com', 54 auth='secret'), id=4))) 55 self.assertEquals(self.server.transport.value(), 56 str(pureldap.LDAPMessage( 57 pureldap.LDAPBindResponse(resultCode=0, 58 matchedDN='cn=thingie,ou=stuff,dc=example,dc=com'), 59 id=4)))
60
62 self.server.dataReceived(str(pureldap.LDAPMessage( 63 pureldap.LDAPBindRequest(dn='cn=thingie,ou=stuff,dc=example,dc=com', 64 auth='invalid'), 65 id=734))) 66 self.assertEquals(self.server.transport.value(), 67 str(pureldap.LDAPMessage( 68 pureldap.LDAPBindResponse( 69 resultCode=ldaperrors.LDAPInvalidCredentials.resultCode), 70 id=734)))
71
73 self.server.dataReceived(str(pureldap.LDAPMessage( 74 pureldap.LDAPBindRequest(dn='cn=non-existing,dc=example,dc=com', 75 auth='invalid'), 76 id=78))) 77 self.assertEquals(self.server.transport.value(), 78 str(pureldap.LDAPMessage( 79 pureldap.LDAPBindResponse( 80 resultCode=ldaperrors.LDAPInvalidCredentials.resultCode), 81 id=78)))
82
84 self.server.dataReceived(str(pureldap.LDAPMessage( 85 pureldap.LDAPBindRequest(version=1), 86 id=32))) 87 self.assertEquals(self.server.transport.value(), 88 str(pureldap.LDAPMessage( 89 pureldap.LDAPBindResponse( 90 resultCode=ldaperrors.LDAPProtocolError.resultCode, 91 errorMessage='Version 1 not supported'), 92 id=32)))
93
95 self.server.dataReceived(str(pureldap.LDAPMessage( 96 pureldap.LDAPBindRequest(version=2), 97 id=32))) 98 self.assertEquals(self.server.transport.value(), 99 str(pureldap.LDAPMessage( 100 pureldap.LDAPBindResponse( 101 resultCode=ldaperrors.LDAPProtocolError.resultCode, 102 errorMessage='Version 2 not supported'), 103 id=32)))
104
106 self.server.dataReceived(str(pureldap.LDAPMessage( 107 pureldap.LDAPBindRequest(version=4), 108 id=32))) 109 self.assertEquals(self.server.transport.value(), 110 str(pureldap.LDAPMessage( 111 pureldap.LDAPBindResponse( 112 resultCode=ldaperrors.LDAPProtocolError.resultCode, 113 errorMessage='Version 4 not supported'), 114 id=32)))
115
117 # TODO make a test just like this one that would pass authentication 118 # if version was correct, to ensure we don't leak that info either. 119 self.server.dataReceived(str(pureldap.LDAPMessage( 120 pureldap.LDAPBindRequest(version=4, 121 dn='cn=non-existing,dc=example,dc=com', 122 auth='invalid'), 123 id=11))) 124 self.assertEquals(self.server.transport.value(), 125 str(pureldap.LDAPMessage( 126 pureldap.LDAPBindResponse( 127 resultCode=ldaperrors.LDAPProtocolError.resultCode, 128 errorMessage='Version 4 not supported'), 129 id=11)))
130
131 - def test_unbind(self):
132 self.server.dataReceived(str(pureldap.LDAPMessage(pureldap.LDAPUnbindRequest(), id=7))) 133 self.assertEquals(self.server.transport.value(), 134 '')
135
136 - def test_search_outOfTree(self):
137 self.server.dataReceived(str(pureldap.LDAPMessage( 138 pureldap.LDAPSearchRequest( 139 baseObject='dc=invalid', 140 ), id=2))) 141 self.assertEquals(self.server.transport.value(), 142 str(pureldap.LDAPMessage( 143 pureldap.LDAPSearchResultDone(resultCode=ldaperrors.LDAPNoSuchObject.resultCode), 144 id=2)), 145 )
146
148 self.server.dataReceived(str(pureldap.LDAPMessage( 149 pureldap.LDAPSearchRequest( 150 baseObject='cn=thingie,ou=stuff,dc=example,dc=com', 151 ), id=2))) 152 self.assertEquals(self.server.transport.value(), 153 str(pureldap.LDAPMessage( 154 pureldap.LDAPSearchResultEntry( 155 objectName='cn=thingie,ou=stuff,dc=example,dc=com', 156 attributes=[ ('objectClass', ['a', 'b']), 157 ('cn', ['thingie']), 158 ]), 159 id=2)) 160 + str(pureldap.LDAPMessage( 161 pureldap.LDAPSearchResultDone(resultCode=0), 162 id=2)), 163 )
164
166 self.server.dataReceived(str(pureldap.LDAPMessage( 167 pureldap.LDAPSearchRequest( 168 baseObject='ou=stuff,dc=example,dc=com', 169 ), id=2))) 170 self.assertEquals(self.server.transport.value(), 171 str(pureldap.LDAPMessage( 172 pureldap.LDAPSearchResultEntry( 173 objectName='ou=stuff,dc=example,dc=com', 174 attributes=[ ('objectClass', ['a', 'b']), 175 ('ou', ['stuff']), 176 ]), 177 id=2)) 178 + str(pureldap.LDAPMessage( 179 pureldap.LDAPSearchResultEntry( 180 objectName='cn=another,ou=stuff,dc=example,dc=com', 181 attributes=[ ('objectClass', ['a', 'b']), 182 ('cn', ['another']), 183 ]), 184 id=2)) 185 + str(pureldap.LDAPMessage( 186 pureldap.LDAPSearchResultEntry( 187 objectName='cn=thingie,ou=stuff,dc=example,dc=com', 188 attributes=[ ('objectClass', ['a', 'b']), 189 ('cn', ['thingie']), 190 ]), 191 id=2)) 192 + str(pureldap.LDAPMessage( 193 pureldap.LDAPSearchResultDone(resultCode=0), 194 id=2)), 195 )
196
198 self.server.dataReceived(str(pureldap.LDAPMessage( 199 pureldap.LDAPSearchRequest( 200 baseObject='ou=stuff,dc=example,dc=com', 201 scope=pureldap.LDAP_SCOPE_singleLevel, 202 ), id=2))) 203 self.assertEquals(self.server.transport.value(), 204 str(pureldap.LDAPMessage( 205 pureldap.LDAPSearchResultEntry( 206 objectName='cn=thingie,ou=stuff,dc=example,dc=com', 207 attributes=[ ('objectClass', ['a', 'b']), 208 ('cn', ['thingie']), 209 ]), 210 id=2)) 211 + str(pureldap.LDAPMessage( 212 pureldap.LDAPSearchResultEntry( 213 objectName='cn=another,ou=stuff,dc=example,dc=com', 214 attributes=[ ('objectClass', ['a', 'b']), 215 ('cn', ['another']), 216 ]), 217 id=2)) 218 + str(pureldap.LDAPMessage( 219 pureldap.LDAPSearchResultDone(resultCode=0), 220 id=2)), 221 )
222
224 self.server.dataReceived(str(pureldap.LDAPMessage( 225 pureldap.LDAPSearchRequest( 226 baseObject='ou=stuff,dc=example,dc=com', 227 scope=pureldap.LDAP_SCOPE_wholeSubtree, 228 ), id=2))) 229 self.assertEquals(self.server.transport.value(), 230 str(pureldap.LDAPMessage( 231 pureldap.LDAPSearchResultEntry( 232 objectName='ou=stuff,dc=example,dc=com', 233 attributes=[ ('objectClass', ['a', 'b']), 234 ('ou', ['stuff']), 235 ]), 236 id=2)) 237 + str(pureldap.LDAPMessage( 238 pureldap.LDAPSearchResultEntry( 239 objectName='cn=another,ou=stuff,dc=example,dc=com', 240 attributes=[ ('objectClass', ['a', 'b']), 241 ('cn', ['another']), 242 ]), 243 id=2)) 244 + str(pureldap.LDAPMessage( 245 pureldap.LDAPSearchResultEntry( 246 objectName='cn=thingie,ou=stuff,dc=example,dc=com', 247 attributes=[ ('objectClass', ['a', 'b']), 248 ('cn', ['thingie']), 249 ]), 250 id=2)) 251 + str(pureldap.LDAPMessage( 252 pureldap.LDAPSearchResultDone(resultCode=0), 253 id=2)), 254 )
255
257 self.server.dataReceived(str(pureldap.LDAPMessage( 258 pureldap.LDAPSearchRequest( 259 baseObject='ou=stuff,dc=example,dc=com', 260 scope=pureldap.LDAP_SCOPE_baseObject, 261 ), id=2))) 262 self.assertEquals(self.server.transport.value(), 263 str(pureldap.LDAPMessage( 264 pureldap.LDAPSearchResultEntry( 265 objectName='ou=stuff,dc=example,dc=com', 266 attributes=[ ('objectClass', ['a', 'b']), 267 ('ou', ['stuff']), 268 ]), 269 id=2)) 270 + str(pureldap.LDAPMessage( 271 pureldap.LDAPSearchResultDone(resultCode=0), 272 id=2)), 273 )
274
275 - def test_rootDSE(self):
276 self.server.dataReceived(str(pureldap.LDAPMessage( 277 pureldap.LDAPSearchRequest( 278 baseObject='', 279 scope=pureldap.LDAP_SCOPE_baseObject, 280 filter=pureldap.LDAPFilter_present('objectClass'), 281 ), id=2))) 282 self.assertEquals(self.server.transport.value(), 283 str(pureldap.LDAPMessage( 284 pureldap.LDAPSearchResultEntry( 285 objectName='', 286 attributes=[ ('supportedLDAPVersion', ['3']), 287 ('namingContexts', ['dc=example,dc=com']), 288 ('supportedExtension', [ 289 pureldap.LDAPPasswordModifyRequest.oid, 290 ]), 291 ]), 292 id=2)) 293 + str(pureldap.LDAPMessage( 294 pureldap.LDAPSearchResultDone(resultCode=ldaperrors.Success.resultCode), 295 id=2)), 296 )
297
298 - def test_delete(self):
299 self.server.dataReceived(str(pureldap.LDAPMessage( 300 pureldap.LDAPDelRequest(str(self.thingie.dn)), id=2))) 301 self.assertEquals(self.server.transport.value(), 302 str(pureldap.LDAPMessage( 303 pureldap.LDAPDelResponse(resultCode=0), 304 id=2)), 305 ) 306 d = self.stuff.children() 307 d.addCallback(self.assertEquals, [self.another]) 308 return d
309
310 - def test_add_success(self):
311 dn = 'cn=new,ou=stuff,dc=example,dc=com' 312 self.server.dataReceived(str(pureldap.LDAPMessage( 313 pureldap.LDAPAddRequest(entry=dn, 314 attributes=[ 315 (pureldap.LDAPAttributeDescription("objectClass"), 316 pureber.BERSet(value=[ 317 pureldap.LDAPAttributeValue('something'), 318 ])), 319 ]), id=2))) 320 self.assertEquals(self.server.transport.value(), 321 str(pureldap.LDAPMessage( 322 pureldap.LDAPAddResponse( 323 resultCode=ldaperrors.Success.resultCode), 324 id=2)), 325 ) 326 # tree changed 327 d = self.stuff.children() 328 d.addCallback(self.assertEquals, [ 329 self.thingie, 330 self.another, 331 inmemory.ReadOnlyInMemoryLDAPEntry( 332 'cn=new,ou=stuff,dc=example,dc=com', 333 {'objectClass': ['something']}), 334 ]) 335 return d
336
338 self.server.dataReceived(str(pureldap.LDAPMessage( 339 pureldap.LDAPAddRequest(entry=str(self.thingie.dn), 340 attributes=[ 341 (pureldap.LDAPAttributeDescription("objectClass"), 342 pureber.BERSet(value=[ 343 pureldap.LDAPAttributeValue('something'), 344 ])), 345 ]), id=2))) 346 self.assertEquals(self.server.transport.value(), 347 str(pureldap.LDAPMessage( 348 pureldap.LDAPAddResponse( 349 resultCode=ldaperrors.LDAPEntryAlreadyExists.resultCode, 350 errorMessage=str(self.thingie.dn)), 351 id=2)), 352 ) 353 # tree did not change 354 d = self.stuff.children() 355 d.addCallback(self.assertEquals, [self.thingie, self.another]) 356 return d
357
359 newrdn = 'cn=thingamagic' 360 self.server.dataReceived(str(pureldap.LDAPMessage( 361 pureldap.LDAPModifyDNRequest(entry=self.thingie.dn, 362 newrdn=newrdn, 363 deleteoldrdn=True), 364 id=2))) 365 self.assertEquals(self.server.transport.value(), 366 str(pureldap.LDAPMessage( 367 pureldap.LDAPModifyDNResponse( 368 resultCode=ldaperrors.Success.resultCode), 369 id=2)), 370 ) 371 # tree changed 372 d = self.stuff.children() 373 d.addCallback(self.assertEquals, [ 374 inmemory.ReadOnlyInMemoryLDAPEntry( 375 '%s,ou=stuff,dc=example,dc=com' % newrdn, 376 {'objectClass': ['a', 'b'], 377 'cn': ['thingamagic']}), 378 self.another, 379 ]) 380 return d
381
383 newrdn = 'cn=thingamagic' 384 self.server.dataReceived(str(pureldap.LDAPMessage( 385 pureldap.LDAPModifyDNRequest(entry=self.thingie.dn, 386 newrdn=newrdn, 387 deleteoldrdn=False), 388 id=2))) 389 self.assertEquals(self.server.transport.value(), 390 str(pureldap.LDAPMessage( 391 pureldap.LDAPModifyDNResponse( 392 resultCode=ldaperrors.Success.resultCode), 393 id=2)), 394 ) 395 # tree changed 396 d = self.stuff.children() 397 d.addCallback(self.assertEquals, sets.Set([ 398 self.another, 399 inmemory.ReadOnlyInMemoryLDAPEntry( 400 '%s,ou=stuff,dc=example,dc=com' % newrdn, 401 {'objectClass': ['a', 'b'], 402 'cn': ['thingamagic', 'thingie']}), 403 ])) 404 return d
405 test_modifyDN_rdnOnly_noDeleteOldRDN_success.todo = 'Not supported yet.' 406
407 - def test_modify(self):
408 self.server.dataReceived(str(pureldap.LDAPMessage( 409 pureldap.LDAPModifyRequest(self.stuff.dn, 410 modification=[ 411 delta.Add('foo', ['bar']).asLDAP(), 412 ], 413 ), 414 id=2))) 415 self.assertEquals(self.server.transport.value(), 416 str(pureldap.LDAPMessage( 417 pureldap.LDAPModifyResponse( 418 resultCode=ldaperrors.Success.resultCode), 419 id=2)), 420 ) 421 # tree changed 422 self.assertEquals( 423 self.stuff, 424 inmemory.ReadOnlyInMemoryLDAPEntry( 425 'ou=stuff,dc=example,dc=com', 426 {'objectClass': ['a', 'b'], 427 'ou': ['stuff'], 428 'foo': ['bar']}))
429
431 self.server.dataReceived(str(pureldap.LDAPMessage( 432 pureldap.LDAPExtendedRequest(requestName='42.42.42', 433 requestValue='foo'), 434 id=2))) 435 self.assertEquals(self.server.transport.value(), 436 str(pureldap.LDAPMessage( 437 pureldap.LDAPExtendedResponse( 438 resultCode=ldaperrors.LDAPProtocolError.resultCode, 439 errorMessage='Unknown extended request: 42.42.42'), 440 id=2)), 441 )
442
444 self.server.dataReceived(str(pureldap.LDAPMessage( 445 pureldap.LDAPPasswordModifyRequest( 446 userIdentity='cn=thingie,ou=stuff,dc=example,dc=com', 447 newPasswd='hushhush'), 448 id=2))) 449 self.assertEquals(self.server.transport.value(), 450 str(pureldap.LDAPMessage( 451 pureldap.LDAPExtendedResponse( 452 resultCode=ldaperrors.LDAPStrongAuthRequired.resultCode, 453 responseName=pureldap.LDAPPasswordModifyRequest.oid), 454 id=2)), 455 )
456
458 # first bind to some entry 459 self.thingie['userPassword'] = ['{SSHA}yVLLj62rFf3kDAbzwEU0zYAVvbWrze8='] # "secret" 460 self.server.dataReceived(str(pureldap.LDAPMessage(pureldap.LDAPBindRequest( 461 dn='cn=thingie,ou=stuff,dc=example,dc=com', 462 auth='secret'), id=4))) 463 self.assertEquals(self.server.transport.value(), 464 str(pureldap.LDAPMessage( 465 pureldap.LDAPBindResponse(resultCode=0, 466 matchedDN='cn=thingie,ou=stuff,dc=example,dc=com'), 467 id=4))) 468 self.server.transport.clear() 469 470 self.server.dataReceived(str(pureldap.LDAPMessage( 471 pureldap.LDAPPasswordModifyRequest( 472 userIdentity='cn=thingie,ou=stuff,dc=example,dc=com', 473 newPasswd='hushhush'), 474 id=2))) 475 self.assertEquals(self.server.transport.value(), 476 str(pureldap.LDAPMessage( 477 pureldap.LDAPExtendedResponse( 478 resultCode=ldaperrors.Success.resultCode, 479 responseName=pureldap.LDAPPasswordModifyRequest.oid), 480 id=2)), 481 ) 482 # tree changed 483 secrets = self.thingie.get('userPassword', []) 484 self.assertEquals(len(secrets), 1) 485 for secret in secrets: 486 self.assertEquals(secret[:len('{SSHA}')], '{SSHA}') 487 raw = base64.decodestring(secret[len('{SSHA}'):]) 488 salt = raw[20:] 489 self.assertEquals(entry.sshaDigest('hushhush', salt), 490 secret)
491
492 - def test_unknownRequest(self):
493 # make server miss one of the handle_* attributes 494 # without having to modify the LDAPServer class 495 class MockServer(ldapserver.LDAPServer): 496 handle_LDAPBindRequest = property()
497 self.server.__class__ = MockServer 498 self.server.dataReceived(str(pureldap.LDAPMessage( 499 pureldap.LDAPBindRequest(), id=2))) 500 self.assertEquals(self.server.transport.value(), 501 str(pureldap.LDAPMessage( 502 pureldap.LDAPExtendedResponse(resultCode=ldaperrors.LDAPProtocolError.resultCode, 503 responseName='1.3.6.1.4.1.1466.20036', 504 errorMessage='Unknown request'), id=2)))
505
506 - def test_control_unknown_critical(self):
507 self.server.dataReceived(str(pureldap.LDAPMessage( 508 pureldap.LDAPBindRequest(), id=2, 509 controls=[('42.42.42.42', True, None), 510 ]))) 511 self.assertEquals(self.server.transport.value(), 512 str(pureldap.LDAPMessage( 513 pureldap.LDAPBindResponse( 514 resultCode=ldaperrors.LDAPUnavailableCriticalExtension.resultCode, 515 errorMessage='Unknown control 42.42.42.42'), id=2)))
516
517 - def test_control_unknown_nonCritical(self):
518 self.thingie['userPassword'] = ['{SSHA}yVLLj62rFf3kDAbzwEU0zYAVvbWrze8='] # "secret" 519 self.server.dataReceived(str(pureldap.LDAPMessage( 520 pureldap.LDAPBindRequest(dn='cn=thingie,ou=stuff,dc=example,dc=com', 521 auth='secret'), 522 controls=[('42.42.42.42', False, None)], 523 id=4))) 524 self.assertEquals(self.server.transport.value(), 525 str(pureldap.LDAPMessage( 526 pureldap.LDAPBindResponse(resultCode=0, 527 matchedDN='cn=thingie,ou=stuff,dc=example,dc=com'), 528 id=4)))
529 530
531 -class TestSchema(unittest.TestCase):
532 - def setUp(self):
533 db = inmemory.ReadOnlyInMemoryLDAPEntry('', {}) 534 com = db.addChild('dc=com', 535 {'objectClass': ['dcObject'], 536 'dc': ['com'], 537 }) 538 com.addChild('dc=example', 539 {'objectClass': ['dcObject'], 540 'dc': ['example'], 541 'subschemaSubentry': ['cn=schema'], 542 }) 543 db.addChild('cn=schema', 544 {'objectClass': ['TODO'], 545 'cn': ['schema'], 546 'attributeTypes': [test_schema.AttributeType_KnownValues.knownValues[0][0]], 547 'objectClasses': [test_schema.OBJECTCLASSES['organization'], 548 test_schema.OBJECTCLASSES['organizationalUnit'], 549 ], 550 }) 551 552 class LDAPServerFactory(protocol.ServerFactory): 553 protocol = ldapserver.LDAPServer 554 def __init__(self, root): 555 self.root = root
556 557 components.registerAdapter(lambda x: x.root, 558 LDAPServerFactory, 559 interfaces.IConnectedLDAPEntry) 560 serverFactory = LDAPServerFactory(db) 561 562 self.client = ldapclient.LDAPClient() 563 server = serverFactory.buildProtocol(address.IPv4Address('TCP', 'localhost', '1024')) 564 util.returnConnected(server, self.client) 565
566 - def testSimple(self):
567 d = fetchschema.fetch(self.client, 'dc=example,dc=com') 568 (attributeTypes, objectClasses) = util.pumpingDeferredResult(d) 569 570 self.failUnlessEqual([str(x) for x in attributeTypes], 571 [str(schema.AttributeTypeDescription(x)) for x in [ 572 test_schema.AttributeType_KnownValues.knownValues[0][0], 573 ]]) 574 575 self.failUnlessEqual([str(x) for x in objectClasses], 576 [str(schema.ObjectClassDescription(x)) for x in [ 577 test_schema.OBJECTCLASSES['organization'], 578 test_schema.OBJECTCLASSES['organizationalUnit'], 579 ]])
580