diff --git a/pika/connection.py b/pika/connection.py
index c4877bdc36bb06969ae83f73ba37f3775d08606d_cGlrYS9jb25uZWN0aW9uLnB5..661d12f266887b26cb3d2fcf6e96de1fbff79d5b_cGlrYS9jb25uZWN0aW9uLnB5 100644
--- a/pika/connection.py
+++ b/pika/connection.py
@@ -955,57 +955,17 @@
                     'Specified ssl_options=None URL arg is inconsistent with '
                     'the specified https URL scheme.')
         else:
-            # Note: this is the deprecated wrap_socket signature and info:
-            #
-            # Internally, function creates a SSLContext with protocol
-            # ssl_version and SSLContext.options set to cert_reqs.
-            # If parameters keyfile, certfile, ca_certs or ciphers are set,
-            # then the values are passed to SSLContext.load_cert_chain(),
-            # SSLContext.load_verify_locations(), and SSLContext.set_ciphers().
-            #
-            # ssl.wrap_socket(sock,
-            #     keyfile=None,
-            #     certfile=None,
-            #     server_side=False,        # Not URL-supported
-            #     cert_reqs=CERT_NONE,      # Not URL-supported
-            #     ssl_version=PROTOCOL_TLS, # Not URL-supported
-            #     ca_certs=None,
-            #     do_handshake_on_connect=True, # Not URL-supported
-            #     suppress_ragged_eofs=True,    # Not URL-supported
-            #     ciphers=None
-            cxt = None
-            if 'ca_certs' in opts:
-                opt_ca_certs = opts['ca_certs']
-                if os.path.isfile(opt_ca_certs):
-                    cxt = ssl.create_default_context(cafile=opt_ca_certs)
-                elif os.path.isdir(opt_ca_certs):
-                    cxt = ssl.create_default_context(capath=opt_ca_certs)
-                else:
-                    LOGGER.warning('ca_certs is specified via ssl_options but '
-                                   'is neither a valid file nor directory: "%s"',
-                                   opt_ca_certs)
-
-            if 'certfile' in opts:
-                if os.path.isfile(opts['certfile']):
-                    keyfile = opts.get('keyfile')
-                    password = opts.get('password')
-                    cxt.load_cert_chain(opts['certfile'], keyfile, password)
-                else:
-                    LOGGER.warning('certfile is specified via ssl_options but '
-                                   'is not a valid file: "%s"',
-                                   opts['certfile'])
-
-            if 'ciphers' in opts:
-                opt_ciphers = opts['ciphers']
-                if opt_ciphers is not None:
-                    cxt.set_ciphers(opt_ciphers)
-                else:
-                    LOGGER.warning('ciphers specified in ssl_options but '
-                                   'evaluates to None')
-
-            server_hostname = opts.get('server_hostname')
-            self.ssl_options = pika.SSLOptions(context=cxt,
-                                               server_hostname=server_hostname)
+            self.ssl_options = pika.SSLOptions(
+                keyfile=opts.get('keyfile'),
+                key_password=opts.get('key_password') or opts.get('password'),
+                certfile=opts.get('certfile'),
+                verify_mode=opts.get('verify_mode') or ssl.CERT_NONE,
+                ssl_version=opts.get('ssl_version') or ssl.PROTOCOL_TLSv1,
+                cafile=opts.get('cafile'),
+                capath=opts.get('capath'),
+                cadata=opts.get('cadata'),
+                ciphers=opts.get('ciphers'),
+                server_hostname=opts.get('server_hostname'))
 
     def _set_url_tcp_options(self, value):
         """Deserialize and apply the corresponding query string arg"""
@@ -1042,7 +1002,7 @@
                  certfile=None,
                  server_side=False,
                  verify_mode=ssl.CERT_NONE,
-                 ssl_version=ssl.PROTOCOL_SSLv23,
+                 ssl_version=ssl.PROTOCOL_TLSv1,
                  cafile=None,
                  capath=None,
                  cadata=None,
diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py
index c4877bdc36bb06969ae83f73ba37f3775d08606d_dGVzdHMvYWNjZXB0YW5jZS9hc3luY190ZXN0X2Jhc2UucHk=..661d12f266887b26cb3d2fcf6e96de1fbff79d5b_dGVzdHMvYWNjZXB0YW5jZS9hc3luY190ZXN0X2Jhc2UucHk= 100644
--- a/tests/acceptance/async_test_base.py
+++ b/tests/acceptance/async_test_base.py
@@ -24,6 +24,13 @@
 from pika.adapters import select_connection
 
 
+def enable_tls():
+    if 'PIKA_TEST_TLS' in os.environ and \
+            os.environ['PIKA_TEST_TLS'].lower() == 'true':
+        return True
+    return False
+
+
 class AsyncTestCase(unittest.TestCase):
     DESCRIPTION = ""
     ADAPTER = None
@@ -31,8 +38,5 @@
 
     def setUp(self):
         self.logger = logging.getLogger(self.__class__.__name__)
-        self.parameters = pika.ConnectionParameters(
-            host='localhost',
-            port=5672)
-        if self.should_test_tls():
+        if enable_tls():
             self.logger.info('testing using TLS/SSL connection to port 5671')
@@ -38,12 +42,8 @@
             self.logger.info('testing using TLS/SSL connection to port 5671')
-            self.parameters.port = 5671
-            self.parameters.ssl = True
-            self.parameters.ssl_options = dict(
-                ssl_version=ssl.PROTOCOL_TLSv1,
-                ca_certs="testdata/certs/ca_certificate.pem",
-                keyfile="testdata/certs/client_key.pem",
-                certfile="testdata/certs/client_certificate.pem",
-                cert_reqs=ssl.CERT_REQUIRED)
+            url = 'amqps://localhost:5671/%2F?ssl_options=%7B%27ca_certs%27%3A%27testdata%2Fcerts%2Fca_certificate.pem%27%2C%27keyfile%27%3A%27testdata%2Fcerts%2Fclient_key.pem%27%2C%27certfile%27%3A%27testdata%2Fcerts%2Fclient_certificate.pem%27%7D'
+            self.parameters = pika.URLParameters(url)
+        else:
+            self.parameters = pika.ConnectionParameters(host='localhost', port=5672)
         self._timed_out = False
         super(AsyncTestCase, self).setUp()
 
@@ -47,13 +47,6 @@
         self._timed_out = False
         super(AsyncTestCase, self).setUp()
 
-    @staticmethod
-    def should_test_tls():
-        if 'PIKA_TEST_TLS' in os.environ and \
-                os.environ['PIKA_TEST_TLS'].lower() == 'true':
-            return True
-        return False
-
     def tearDown(self):
         self._stop()