# HG changeset patch
# User Paul Kehrer <paul.l.kehrer@gmail.com>
# Date 1594002572 18000
#      Sun Jul 05 21:29:32 2020 -0500
# Node ID 00ba75d4fc4f2cd9f3e6cbdd7881433fe72c9a29
# Parent  23d01ba79fc6415cc3deb10f496fff646c16486e
support x509 request challenge password parsing (#4944)

* support x509 request challenge password parsing

* switch to a more generic (but not too generic) attribute parsing

* make it raise a valueerror

* Update tests/x509/test_x509.py

Co-authored-by: Alex Gaynor <alex.gaynor@gmail.com>

Co-authored-by: Alex Gaynor <alex.gaynor@gmail.com>

diff --git a/CHANGELOG.rst b/CHANGELOG.rst
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -42,6 +42,8 @@
   X.509 extension.
 * Added support for parsing
   :class:`~cryptography.x509.SignedCertificateTimestamps` in OCSP responses.
+* Added support for parsing attributes in certificate signing requests via
+  :meth:`~cryptography.x509.CertificateSigningRequest.get_attribute_for_oid`.
 
 .. _v2-9-2:
 
diff --git a/docs/x509/reference.rst b/docs/x509/reference.rst
--- a/docs/x509/reference.rst
+++ b/docs/x509/reference.rst
@@ -894,6 +894,17 @@
         :raises UnicodeError: If an extension contains IDNA encoding that is
             invalid or not compliant with IDNA 2008.
 
+    .. method:: get_attribute_for_oid(oid)
+
+        .. versionadded:: 3.0
+
+        :param oid: An :class:`ObjectIdentifier` instance.
+
+        :returns: The bytes value of the attribute or an exception if not
+            found.
+
+        :raises cryptography.x509.AttributeNotFound: If the request does
+            not have the attribute requested.
 
     .. method:: public_bytes(encoding)
 
@@ -3217,6 +3228,15 @@
 
         Corresponds to the dotted string ``"1.3.6.1.5.5.7.48.1.2"``.
 
+
+.. class:: AttributeOID
+
+    .. versionadded:: 3.0
+
+    .. attribute:: CHALLENGE_PASSWORD
+
+        Corresponds to the dotted string ``"1.2.840.113549.1.9.7"``.
+
 Helper Functions
 ~~~~~~~~~~~~~~~~
 .. currentmodule:: cryptography.x509
@@ -3264,6 +3284,18 @@
 
         Returns the OID.
 
+.. class:: AttributeNotFound
+
+    This is raised when calling
+    :meth:`CertificateSigningRequest.get_attribute_for_oid` with
+    an attribute OID that is not present in the request.
+
+    .. attribute:: oid
+
+        :type: :class:`ObjectIdentifier`
+
+        Returns the OID.
+
 .. class:: UnsupportedGeneralNameType
 
     This is raised when a certificate contains an unsupported general name
diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py
--- a/src/cryptography/hazmat/backends/openssl/x509.py
+++ b/src/cryptography/hazmat/backends/openssl/x509.py
@@ -16,10 +16,11 @@
     _asn1_string_to_bytes, _decode_x509_name, _obj2txt, _parse_asn1_time
 )
 from cryptography.hazmat.backends.openssl.encode_asn1 import (
-    _encode_asn1_int_gc
+    _encode_asn1_int_gc, _txt2obj_gc
 )
 from cryptography.hazmat.primitives import hashes, serialization
 from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
+from cryptography.x509.name import _ASN1Type
 
 
 @utils.register_interface(x509.Certificate)
@@ -485,6 +486,41 @@
 
         return True
 
+    def get_attribute_for_oid(self, oid):
+        obj = _txt2obj_gc(self._backend, oid.dotted_string)
+        pos = self._backend._lib.X509_REQ_get_attr_by_OBJ(
+            self._x509_req, obj, -1
+        )
+        if pos == -1:
+            raise x509.AttributeNotFound(
+                "No {} attribute was found".format(oid), oid
+            )
+
+        attr = self._backend._lib.X509_REQ_get_attr(self._x509_req, pos)
+        self._backend.openssl_assert(attr != self._backend._ffi.NULL)
+        asn1_type = self._backend._lib.X509_ATTRIBUTE_get0_type(attr, pos)
+        self._backend.openssl_assert(asn1_type != self._backend._ffi.NULL)
+        # We need this to ensure that our C type cast is safe.
+        # Also this should always be a sane string type, but we'll see if
+        # that is true in the real world...
+        if asn1_type.type not in (
+            _ASN1Type.UTF8String.value,
+            _ASN1Type.PrintableString.value,
+            _ASN1Type.IA5String.value,
+        ):
+            raise ValueError("OID {} has a disallowed ASN.1 type: {}".format(
+                oid, asn1_type.type
+            ))
+
+        data = self._backend._lib.X509_ATTRIBUTE_get0_data(
+            attr, pos, asn1_type.type, self._backend._ffi.NULL
+        )
+        self._backend.openssl_assert(data != self._backend._ffi.NULL)
+        # This cast is safe iff we assert on the type above to ensure
+        # that it is always a type of ASN1_STRING
+        data = self._backend._ffi.cast("ASN1_STRING *", data)
+        return _asn1_string_to_bytes(self._backend, data)
+
 
 @utils.register_interface(
     x509.certificate_transparency.SignedCertificateTimestamp
diff --git a/src/cryptography/x509/__init__.py b/src/cryptography/x509/__init__.py
--- a/src/cryptography/x509/__init__.py
+++ b/src/cryptography/x509/__init__.py
@@ -6,8 +6,8 @@
 
 from cryptography.x509 import certificate_transparency
 from cryptography.x509.base import (
-    Certificate, CertificateBuilder, CertificateRevocationList,
-    CertificateRevocationListBuilder,
+    AttributeNotFound, Certificate, CertificateBuilder,
+    CertificateRevocationList, CertificateRevocationListBuilder,
     CertificateSigningRequest, CertificateSigningRequestBuilder,
     InvalidVersion, RevokedCertificate, RevokedCertificateBuilder,
     Version, load_der_x509_certificate, load_der_x509_crl, load_der_x509_csr,
@@ -121,6 +121,7 @@
     "load_pem_x509_crl",
     "load_der_x509_crl",
     "random_serial_number",
+    "AttributeNotFound",
     "InvalidVersion",
     "DeltaCRLIndicator",
     "DuplicateExtension",
diff --git a/src/cryptography/x509/base.py b/src/cryptography/x509/base.py
--- a/src/cryptography/x509/base.py
+++ b/src/cryptography/x509/base.py
@@ -22,6 +22,12 @@
 _EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
 
 
+class AttributeNotFound(Exception):
+    def __init__(self, msg, oid):
+        super(AttributeNotFound, self).__init__(msg)
+        self.oid = oid
+
+
 def _reject_duplicate_extension(extension, extensions):
     # This is quadratic in the number of extensions
     for e in extensions:
@@ -367,6 +373,12 @@
         Verifies signature of signing request.
         """
 
+    @abc.abstractproperty
+    def get_attribute_for_oid(self):
+        """
+        Get the attribute value for a given OID.
+        """
+
 
 @six.add_metaclass(abc.ABCMeta)
 class RevokedCertificate(object):
diff --git a/src/cryptography/x509/oid.py b/src/cryptography/x509/oid.py
--- a/src/cryptography/x509/oid.py
+++ b/src/cryptography/x509/oid.py
@@ -162,6 +162,10 @@
     ANY_POLICY = ObjectIdentifier("2.5.29.32.0")
 
 
+class AttributeOID(object):
+    CHALLENGE_PASSWORD = ObjectIdentifier("1.2.840.113549.1.9.7")
+
+
 _OID_NAMES = {
     NameOID.COMMON_NAME: "commonName",
     NameOID.COUNTRY_NAME: "countryName",
@@ -265,4 +269,5 @@
     CertificatePoliciesOID.CPS_QUALIFIER: "id-qt-cps",
     CertificatePoliciesOID.CPS_USER_NOTICE: "id-qt-unotice",
     OCSPExtensionOID.NONCE: "OCSPNonce",
+    AttributeOID.CHALLENGE_PASSWORD: "challengePassword",
 }
diff --git a/tests/x509/test_x509.py b/tests/x509/test_x509.py
--- a/tests/x509/test_x509.py
+++ b/tests/x509/test_x509.py
@@ -1232,6 +1232,45 @@
         assert isinstance(extensions, x509.Extensions)
         assert list(extensions) == []
 
+    def test_get_attribute_for_oid(self, backend):
+        request = _load_cert(
+            os.path.join(
+                "x509", "requests", "challenge.pem"
+            ), x509.load_pem_x509_csr, backend
+        )
+        assert request.get_attribute_for_oid(
+            x509.oid.AttributeOID.CHALLENGE_PASSWORD
+        ) == b"challenge me!"
+
+    def test_invalid_attribute_for_oid(self, backend):
+        """
+        This test deliberately triggers a ValueError because to parse
+        CSR attributes we need to do a C cast. If we're wrong about the
+        type that would be Very Bad so this test confirms we properly explode
+        in the presence of the wrong types.
+        """
+        request = _load_cert(
+            os.path.join(
+                "x509", "requests", "challenge-invalid.der"
+            ), x509.load_der_x509_csr, backend
+        )
+        with pytest.raises(ValueError):
+            request.get_attribute_for_oid(
+                x509.oid.AttributeOID.CHALLENGE_PASSWORD
+            )
+
+    def test_no_challenge_password(self, backend):
+        request = _load_cert(
+            os.path.join(
+                "x509", "requests", "rsa_sha256.pem"
+            ), x509.load_pem_x509_csr, backend
+        )
+        with pytest.raises(x509.AttributeNotFound) as exc:
+            request.get_attribute_for_oid(
+                x509.oid.AttributeOID.CHALLENGE_PASSWORD
+            )
+        assert exc.value.oid == x509.oid.AttributeOID.CHALLENGE_PASSWORD
+
     @pytest.mark.parametrize(
         "loader_func",
         [x509.load_pem_x509_csr, x509.load_der_x509_csr]