/usr/lib/python2.7/dist-packages/saml2/ecp.py is in python-pysaml2 4.0.2-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
Contains classes used in the SAML ECP profile
"""
import logging
from saml2.client_base import ACTOR
from saml2.ecp_client import SERVICE
from saml2 import element_to_extension_element
from saml2 import samlp
from saml2 import soap
from saml2 import BINDING_SOAP, BINDING_PAOS
from saml2.profile import paos
from saml2.profile import ecp
#from saml2.client import Saml2Client
from saml2.server import Server
from saml2.schema import soapenv
from saml2.response import authn_response
logger = logging.getLogger(__name__)
def ecp_capable(headers):
if "application/vnd.paos+xml" in headers["Accept"]:
if "PAOS" in headers:
if 'ver="%s";"%s"' % (paos.NAMESPACE,
SERVICE) in headers["PAOS"]:
return True
return False
#noinspection PyUnusedLocal
def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
""" Makes an authentication request.
:param entityid: The entity ID of the IdP to send the request to
:param relay_state: To where the user should be returned after
successfull log in.
:param sign: Whether the request should be signed or not.
:return: AuthnRequest response
"""
eelist = []
# ----------------------------------------
# <paos:Request>
# ----------------------------------------
my_url = cls.service_url(BINDING_PAOS)
# must_understand and actor according to the standard
#
paos_request = paos.Request(must_understand="1", actor=ACTOR,
response_consumer_url=my_url,
service=SERVICE)
eelist.append(element_to_extension_element(paos_request))
# ----------------------------------------
# <ecp:Request>
# ----------------------------------------
# idp = samlp.IDPEntry(
# provider_id = "https://idp.example.org/entity",
# name = "Example identity provider",
# loc = "https://idp.example.org/saml2/sso",
# )
#
# idp_list = samlp.IDPList(idp_entry= [idp])
#
# ecp_request = ecp.Request(
# actor = ACTOR, must_understand = "1",
# provider_name = "Example Service Provider",
# issuer=saml.Issuer(text="https://sp.example.org/entity"),
# idp_list = idp_list)
#
# eelist.append(element_to_extension_element(ecp_request))
# ----------------------------------------
# <ecp:RelayState>
# ----------------------------------------
relay_state = ecp.RelayState(actor=ACTOR, must_understand="1",
text=relay_state)
eelist.append(element_to_extension_element(relay_state))
header = soapenv.Header()
header.extension_elements = eelist
# ----------------------------------------
# <samlp:AuthnRequest>
# ----------------------------------------
logger.info("entityid: %s, binding: %s" % (entityid, BINDING_SOAP))
location = cls._sso_location(entityid, binding=BINDING_SOAP)
req_id, authn_req = cls.create_authn_request(
location, binding=BINDING_PAOS, service_url_binding=BINDING_PAOS)
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(authn_req)]
# ----------------------------------------
# The SOAP envelope
# ----------------------------------------
soap_envelope = soapenv.Envelope(header=header, body=body)
return req_id, "%s" % soap_envelope
def handle_ecp_authn_response(cls, soap_message, outstanding=None):
rdict = soap.class_instances_from_soap_enveloped_saml_thingies(
soap_message, [paos, ecp, samlp])
_relay_state = None
for item in rdict["header"]:
if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE:
_relay_state = item
response = authn_response(cls.config, cls.service_url(), outstanding,
allow_unsolicited=True)
response.loads("%s" % rdict["body"], False, soap_message)
response.verify()
cls.users.add_information_about_person(response.session_info())
return response, _relay_state
def ecp_response(target_url, response):
# ----------------------------------------
# <ecp:Response
# ----------------------------------------
ecp_response = ecp.Response(assertion_consumer_service_url=target_url)
header = soapenv.Header()
header.extension_elements = [element_to_extension_element(ecp_response)]
# ----------------------------------------
# <samlp:Response
# ----------------------------------------
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(response)]
soap_envelope = soapenv.Envelope(header=header, body=body)
return "%s" % soap_envelope
class ECPServer(Server):
""" This deals with what the IdP has to do
TODO: Still tentative
"""
def __init__(self, config_file="", config=None, cache=None):
Server.__init__(self, config_file, config, cache)
def parse_ecp_authn_query(self):
pass
def ecp_response(self):
# ----------------------------------------
# <ecp:Response
# ----------------------------------------
target_url = ""
ecp_response = ecp.Response(assertion_consumer_service_url=target_url)
header = soapenv.Body()
header.extension_elements = [element_to_extension_element(ecp_response)]
# ----------------------------------------
# <samlp:Response
# ----------------------------------------
response = samlp.Response()
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(response)]
soap_envelope = soapenv.Envelope(header=header, body=body)
return "%s" % soap_envelope
|