-
Notifications
You must be signed in to change notification settings - Fork 2
Sourcery refactored main branch #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,9 @@ | |
| def parseArguments(): | ||
| """Create argument options and parse through them to determine what to do with script.""" | ||
| # Instantiate the parser | ||
| parser = argparse.ArgumentParser(description='Get Certificate Chain v' + scriptVersion) | ||
| parser = argparse.ArgumentParser( | ||
| description=f'Get Certificate Chain v{scriptVersion}' | ||
| ) | ||
|
|
||
| # Optional arguments | ||
| parser.add_argument('--hostname', default='www.google.com:443', | ||
|
|
@@ -136,11 +138,7 @@ def normalizeSubject(__subject: str) -> str: | |
| # Remove wildcards | ||
| commonName = commonName.replace('*.', '') | ||
|
|
||
| # Make sure the filename string is lower case | ||
| newNormalizedName = ''.join(commonName).lower() | ||
|
|
||
| # Return newNormalizedName | ||
| return newNormalizedName | ||
| return ''.join(commonName).lower() | ||
|
Comment on lines
-139
to
+141
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ): |
||
|
|
||
|
|
||
| def getCertificate(__hostname: str, __port: int) -> x509.Certificate: | ||
|
|
@@ -207,9 +205,9 @@ def returnCertAKI(__sslCertificate: x509.Certificate) -> x509.extensions.Extensi | |
|
|
||
| def returnCertSKI(__sslCertificate: x509.Certificate) -> x509.extensions.Extension: | ||
| """Returns the SKI of the certificate.""" | ||
| certSKI = __sslCertificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER) | ||
|
|
||
| return certSKI | ||
| return __sslCertificate.extensions.get_extension_for_oid( | ||
| ExtensionOID.SUBJECT_KEY_IDENTIFIER | ||
| ) | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
|
||
|
|
||
| def returnCertAIA(__sslCertificate: x509.Certificate) -> x509.extensions.Extension: | ||
|
|
@@ -233,11 +231,12 @@ def returnCertAIAList(__sslCertificate: x509.Certificate) -> list: | |
|
|
||
| # If the extension is x509.AuthorityInformationAccess) then lets get the caIssuers from the field. | ||
| if isinstance(certValue, x509.AuthorityInformationAccess): | ||
| dataAIA = [x for x in certValue or []] | ||
| for item in dataAIA: | ||
| if item.access_method._name == "caIssuers": | ||
| aiaUriList.append(item.access_location._value) | ||
|
|
||
| dataAIA = list(certValue or []) | ||
| aiaUriList.extend( | ||
| item.access_location._value | ||
| for item in dataAIA | ||
| if item.access_method._name == "caIssuers" | ||
| ) | ||
|
Comment on lines
-236
to
+239
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
| # Return the aiaUriList back to the script. | ||
| return aiaUriList | ||
|
|
||
|
|
@@ -250,67 +249,64 @@ def walkTheChain(__sslCertificate: x509.Certificate, __depth: int) -> None: | |
| This is to prevent recursive loops. Usually there are only 4 certificates. | ||
| If the maxDepth is too small (why?) adjust it at the beginning of the script. | ||
| """ | ||
| if __depth <= maxDepth: | ||
| # Retrive the AKI from the certificate. | ||
| certAKI = returnCertAKI(__sslCertificate) | ||
| # Retrieve the SKI from the certificate. | ||
| certSKI = returnCertSKI(__sslCertificate) | ||
| if __depth > maxDepth: | ||
| return | ||
| # Retrive the AKI from the certificate. | ||
| certAKI = returnCertAKI(__sslCertificate) | ||
| # Retrieve the SKI from the certificate. | ||
| certSKI = returnCertSKI(__sslCertificate) | ||
|
|
||
| # Sometimes the AKI can be none. Lets handle this accordingly. | ||
| if certAKI is not None: | ||
| certAKIValue = certAKI._value.key_identifier | ||
| else: | ||
| certAKIValue = None | ||
|
|
||
| # Get the value of the SKI from certSKI | ||
| certSKIValue = certSKI._value.digest | ||
|
|
||
| # Sometimes the AKI can be none. Lets handle this accordingly. | ||
| if certAKIValue is not None: | ||
| aiaUriList = returnCertAIAList(__sslCertificate) | ||
| if aiaUriList != []: | ||
| # Iterate through the aiaUriList list. | ||
| for item in aiaUriList: | ||
| # get the certificate for the item element. | ||
| nextCert = getCertificateFromUri(item) | ||
|
|
||
| # If the certificate is not none (great), append it to the certChain, increase the __depth and run the walkTheChain subroutine again. | ||
| if nextCert is not None: | ||
| certChain.append(nextCert) | ||
| __depth += 1 | ||
| walkTheChain(nextCert, __depth) | ||
| else: | ||
| print("Could not retrieve certificate.") | ||
| sys.exit(1) | ||
| else: | ||
| """Now we have to go on a hunt to find the root from a standard root store.""" | ||
| print("Certificate didn't have AIA...ruh roh.") | ||
|
|
||
| # Load the Root CA Cert Chain. | ||
| caRootStore = loadRootCACertChain("cacert.pem") | ||
|
|
||
| # Assume we cannot find a Root CA | ||
| rootCACN = None | ||
|
|
||
| # Iterate through the caRootStore object. | ||
| for rootCA in caRootStore: | ||
| try: | ||
| rootCACertificatePEM = caRootStore[rootCA] | ||
| rootCACertificate = x509.load_pem_x509_certificate(rootCACertificatePEM.encode('ascii')) | ||
| rootCASKI = returnCertSKI(rootCACertificate) | ||
| rootCASKI_Value = rootCASKI._value.digest | ||
| if rootCASKI_Value == certAKIValue: | ||
| rootCACN = rootCA | ||
| print(f"Root CA Found - {rootCACN}") | ||
| certChain.append(rootCACertificate) | ||
| break | ||
| except x509.extensions.ExtensionNotFound: | ||
| # Apparently some Root CA's don't have a SKI? | ||
| pass | ||
|
|
||
| if rootCACN is None: | ||
| print("ERROR - Root CA NOT found.") | ||
| certAKIValue = certAKI._value.key_identifier if certAKI is not None else None | ||
| # Get the value of the SKI from certSKI | ||
| certSKIValue = certSKI._value.digest | ||
|
|
||
| # Sometimes the AKI can be none. Lets handle this accordingly. | ||
| if certAKIValue is not None: | ||
| aiaUriList = returnCertAIAList(__sslCertificate) | ||
| if aiaUriList != []: | ||
| # Iterate through the aiaUriList list. | ||
| for item in aiaUriList: | ||
| # get the certificate for the item element. | ||
| nextCert = getCertificateFromUri(item) | ||
|
|
||
| # If the certificate is not none (great), append it to the certChain, increase the __depth and run the walkTheChain subroutine again. | ||
| if nextCert is not None: | ||
| certChain.append(nextCert) | ||
| __depth += 1 | ||
| walkTheChain(nextCert, __depth) | ||
| else: | ||
| print("Could not retrieve certificate.") | ||
| sys.exit(1) | ||
| else: | ||
| """Now we have to go on a hunt to find the root from a standard root store.""" | ||
| print("Certificate didn't have AIA...ruh roh.") | ||
|
|
||
| # Load the Root CA Cert Chain. | ||
| caRootStore = loadRootCACertChain("cacert.pem") | ||
|
|
||
| # Assume we cannot find a Root CA | ||
| rootCACN = None | ||
|
|
||
| # Iterate through the caRootStore object. | ||
| for rootCA in caRootStore: | ||
| try: | ||
| rootCACertificatePEM = caRootStore[rootCA] | ||
| rootCACertificate = x509.load_pem_x509_certificate(rootCACertificatePEM.encode('ascii')) | ||
| rootCASKI = returnCertSKI(rootCACertificate) | ||
| rootCASKI_Value = rootCASKI._value.digest | ||
| if rootCASKI_Value == certAKIValue: | ||
| rootCACN = rootCA | ||
| print(f"Root CA Found - {rootCACN}") | ||
| certChain.append(rootCACertificate) | ||
| break | ||
| except x509.extensions.ExtensionNotFound: | ||
| # Apparently some Root CA's don't have a SKI? | ||
| pass | ||
|
|
||
| if rootCACN is None: | ||
| print("ERROR - Root CA NOT found.") | ||
| sys.exit(1) | ||
|
Comment on lines
-253
to
+309
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
|
||
|
|
||
| def sendCertificateToFile(__filename: str, __sslCertificate: x509.Certificate) -> None: | ||
|
|
@@ -334,7 +330,7 @@ def writeChainToFile(__certificateChain: list) -> None: | |
| normalizedSubject = normalizeSubject(certSubject) | ||
|
|
||
| # Generate the certificate file name | ||
| sslCertificateFilename = str(len(__certificateChain) - 1 - counter) + '-' + normalizedSubject + '.crt' | ||
| sslCertificateFilename = f'{str(len(__certificateChain) - 1 - counter)}-{normalizedSubject}.crt' | ||
|
Comment on lines
-337
to
+333
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
|
||
| # Send the certificate object to the sslCertificateFileName filename | ||
| sendCertificateToFile(sslCertificateFilename, certificateItem) | ||
|
|
@@ -344,16 +340,12 @@ def checkHostname() -> dict: | |
| """Parse --hostname argument.""" | ||
| tmpLine = "" | ||
|
|
||
| # If the ':' is in the hostname argument, then we'll assume it's meant to be a port following the ':'. | ||
| if ":" in args.hostname: | ||
| tmpLine = args.hostname.split(':') | ||
| hostnameQuery = {"hostname": tmpLine[0], "port": int(tmpLine[1])} | ||
|
|
||
| else: | ||
| if ":" not in args.hostname: | ||
| # If no ':' is found, then set default port 443. | ||
| hostnameQuery = {"hostname": args.hostname, "port": 443} | ||
| return {"hostname": args.hostname, "port": 443} | ||
|
|
||
| return hostnameQuery | ||
| tmpLine = args.hostname.split(':') | ||
| return {"hostname": tmpLine[0], "port": int(tmpLine[1])} | ||
|
Comment on lines
-347
to
+348
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ): |
||
|
|
||
|
|
||
| def getCAcertPEM() -> None: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function
parseArgumentsrefactored with the following changes:use-fstring-for-concatenation)