From 81a5ee50868757014c32f387634e7ef165850e59 Mon Sep 17 00:00:00 2001 From: asmw Date: Sun, 30 Jul 2017 10:46:41 +0200 Subject: [PATCH 1/9] Fix a typo in the exception string --- aes_gcm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aes_gcm.py b/aes_gcm.py index 04e496b..ec6f112 100755 --- a/aes_gcm.py +++ b/aes_gcm.py @@ -50,7 +50,7 @@ def __str__(self): class InvalidTagException(Exception): def __str__(self): - return 'The authenticaiton tag is invalid.' + return 'The authentication tag is invalid.' # Galois/Counter Mode with AES-128 and 96-bit IV From 3cc153507ab0230692ee96e46abadb6efbab0a67 Mon Sep 17 00:00:00 2001 From: asmw Date: Sun, 30 Jul 2017 12:12:14 +0200 Subject: [PATCH 2/9] Enable 192/256 bit keys and usage with bytearrays --- aes_gcm.py | 25 +++++++++++++++++--- test.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 86 insertions(+), 6 deletions(-) diff --git a/aes_gcm.py b/aes_gcm.py index ec6f112..a8b258e 100755 --- a/aes_gcm.py +++ b/aes_gcm.py @@ -59,10 +59,20 @@ def __init__(self, master_key): self.change_key(master_key) def change_key(self, master_key): - if master_key >= (1 << 128): - raise InvalidInputException('Master key should be 128-bit') + if type(master_key) == str: + # Assume this is a good key in a bytearray/bytes + self.__master_key = master_key + elif master_key <= (1 << 128): + self.__master_key = long_to_bytes(master_key, 16) + elif master_key <= (1 << 192): + self.__master_key = long_to_bytes(master_key, 24) + elif master_key <= (1 << 256): + self.__master_key = long_to_bytes(master_key, 32) + + if len(self.__master_key) > 32: + raise InvalidInputException( + 'Master key should be 128, 192 or 256-bit, (got: %s)' % len(self.__master_key * 8)) - self.__master_key = long_to_bytes(master_key, 16) self.__aes_ecb = AES.new(self.__master_key, AES.MODE_ECB) self.__auth_key = bytes_to_long(self.__aes_ecb.encrypt(b'\x00' * 16)) @@ -110,6 +120,9 @@ def __ghash(self, aad, txt): return tag def encrypt(self, init_value, plaintext, auth_data=b''): + if type(init_value) == str: + # Assume the IV is provided as bytes + init_value = bytes_to_long(init_value) if init_value >= (1 << 96): raise InvalidInputException('IV should be 96-bit') # a naive checking for IV reuse @@ -148,6 +161,12 @@ def encrypt(self, init_value, plaintext, auth_data=b''): return ciphertext, auth_tag def decrypt(self, init_value, ciphertext, auth_tag, auth_data=b''): + # Assume the IV and/or auth tag are provided as byte arrays when they look like strings + if type(init_value) == str: + init_value = bytes_to_long(init_value) + if type(auth_tag) == str: + auth_tag = bytes_to_long(auth_tag) + if init_value >= (1 << 96): raise InvalidInputException('IV should be 96-bit') if auth_tag >= (1 << 128): diff --git a/test.py b/test.py index 4231d33..c1dca80 100755 --- a/test.py +++ b/test.py @@ -25,7 +25,8 @@ from aes_gcm import AES_GCM from pprint import pprint from Crypto.Random.random import getrandbits -from Crypto.Util.number import long_to_bytes +from Crypto.Util.number import long_to_bytes, bytes_to_long +from Crypto.Hash import SHA256 test_cases = ({ 'master_key': 0x00000000000000000000000000000000, @@ -34,6 +35,57 @@ 'init_value': 0x000000000000000000000000, 'ciphertext': b'', 'auth_tag': 0x58e2fccefa7e3061367f1d57a4e7455a, +}, { + 'name': '192 bit key', + 'master_key': 0xfffffffffffffffffffffffffffffffff, + 'plaintext': b'\x00\x00\x00\x00\x00\x00\x00\x00' + + b'\x00\x00\x00\x00\x00\x00\x00\x00', + 'auth_data': b'', + 'init_value': 0x000000000000000000000000, + 'ciphertext': b'\x72\x3e\x3e\x28\x03\x9e\xb7\x8c' + + b'\xd2\x95\x39\x7f\x9c\x27\x08\x32', + 'auth_tag': 0xe2deabd0b0d93a417facac9df9f6cf91, +}, { + 'name': '256 bit key as byte array, IV as byte array', + 'master_key': SHA256.new("").digest(), + 'plaintext': b'\x00\x00\x00\x00\x00\x00\x00\x00' + + b'\x00\x00\x00\x00\x00\x00\x00\x00', + 'auth_data': b'', + 'init_value': b'\x00\x00\x00\x00\x00\x00\x00\x00', + 'ciphertext': b'\x09\x33\x71\x35\x75\xe5\x1b\x11' + + b'\xca\xab\x53\x99\xb8\x8d\x48\xc6', + 'auth_tag': 0x761b53ebf18f95502fcd10865ba91e17, +}, { + 'name': '256 bit key as byte array, IV and auth tag as byte array', + 'master_key': SHA256.new("").digest(), + 'plaintext': b'\x00\x00\x00\x00\x00\x00\x00\x00' + + b'\x00\x00\x00\x00\x00\x00\x00\x00', + 'auth_data': b'', + 'init_value': b'\x00\x00\x00\x00\x00\x00\x00\x00', + 'ciphertext': b'\x09\x33\x71\x35\x75\xe5\x1b\x11' + + b'\xca\xab\x53\x99\xb8\x8d\x48\xc6', + 'auth_tag': b'\x76\x1b\x53\xeb\xf1\x8f\x95\x50' + + b'\x2f\xcd\x10\x86\x5b\xa9\x1e\x17', +}, { + 'name': '256 bit key as byte array', + 'master_key': SHA256.new("").digest(), + 'plaintext': b'\x00\x00\x00\x00\x00\x00\x00\x00' + + b'\x00\x00\x00\x00\x00\x00\x00\x00', + 'auth_data': b'', + 'init_value': 0x000000000000000000000000, + 'ciphertext': b'\x09\x33\x71\x35\x75\xe5\x1b\x11' + + b'\xca\xab\x53\x99\xb8\x8d\x48\xc6', + 'auth_tag': 0x761b53ebf18f95502fcd10865ba91e17, +}, { + 'name': '256 bit key', + 'master_key': bytes_to_long(SHA256.new("").digest()), + 'plaintext': b'\x00\x00\x00\x00\x00\x00\x00\x00' + + b'\x00\x00\x00\x00\x00\x00\x00\x00', + 'auth_data': b'', + 'init_value': 0x000000000000000000000000, + 'ciphertext': b'\x09\x33\x71\x35\x75\xe5\x1b\x11' + + b'\xca\xab\x53\x99\xb8\x8d\x48\xc6', + 'auth_tag': 0x761b53ebf18f95502fcd10865ba91e17, }, { 'master_key': 0x00000000000000000000000000000000, 'plaintext': b'\x00\x00\x00\x00\x00\x00\x00\x00' + @@ -94,12 +146,18 @@ num_failures = 0 for test_data in test_cases: + test_tag = test_data['auth_tag'] + if type(test_data['auth_tag']) == str: + test_tag = bytes_to_long(test_data['auth_tag']) + test_gcm = AES_GCM(test_data['master_key']) encrypted, tag = test_gcm.encrypt( test_data['init_value'], test_data['plaintext'], test_data['auth_data'] ) + enc_dbg = '\\x' + '\\x'.join('{:02x}'.format(ord(x)) for x in encrypted) + tag_dbg = hex(tag) states = [] tags = [] @@ -129,16 +187,19 @@ decrypted = test_gcm.decrypt( test_data['init_value'], encrypted, - tag, + test_data['auth_tag'], test_data['auth_data'] ) if encrypted != test_data['ciphertext'] or \ - tag != test_data['auth_tag'] or \ + tag != test_tag or \ decrypted != test_data['plaintext']: num_failures += 1 print('This test case failed:') pprint(test_data) + print("Encrypted: %s (%s)" % (enc_dbg, encrypted == test_data['ciphertext'])) + print("Tag: %s (%s)" % (tag_dbg, tag == test_data['auth_tag'])) + print("Decrypted: %s (%s)" % (decrypted, decrypted == test_data['plaintext'])) print() if num_failures == 0: From 11c26782ea9c483dd158aec87ca58f89a4d10d11 Mon Sep 17 00:00:00 2001 From: asmw Date: Sun, 30 Jul 2017 12:14:20 +0200 Subject: [PATCH 3/9] Update README with key sizes --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 90758d0..ef0a8ce 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ A Python implementation of the authenticated encryption mode [Galois/Counter Mode (GCM)](http://en.wikipedia.org/wiki/Galois/Counter_Mode). -Currently it supports only 128-bit AES and 96-bit nonce. +Currently it supports only 128, 192 and 256-bit AES and 96-bit nonce. ## Dependencies From f5c6e37fec4a51ddd870475db92a482f917a40b8 Mon Sep 17 00:00:00 2001 From: asmw Date: Sun, 30 Jul 2017 12:23:48 +0200 Subject: [PATCH 4/9] Make it python3 + 2 compatible --- aes_gcm.py | 10 +++++----- test.py | 9 ++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/aes_gcm.py b/aes_gcm.py index a8b258e..8ae9da8 100755 --- a/aes_gcm.py +++ b/aes_gcm.py @@ -59,8 +59,8 @@ def __init__(self, master_key): self.change_key(master_key) def change_key(self, master_key): - if type(master_key) == str: - # Assume this is a good key in a bytearray/bytes + if type(master_key) in [bytes, str]: + # Assume this is a good key in a bytearray/bytes/string self.__master_key = master_key elif master_key <= (1 << 128): self.__master_key = long_to_bytes(master_key, 16) @@ -120,7 +120,7 @@ def __ghash(self, aad, txt): return tag def encrypt(self, init_value, plaintext, auth_data=b''): - if type(init_value) == str: + if type(init_value) in [bytes, str]: # Assume the IV is provided as bytes init_value = bytes_to_long(init_value) if init_value >= (1 << 96): @@ -162,9 +162,9 @@ def encrypt(self, init_value, plaintext, auth_data=b''): def decrypt(self, init_value, ciphertext, auth_tag, auth_data=b''): # Assume the IV and/or auth tag are provided as byte arrays when they look like strings - if type(init_value) == str: + if type(init_value) in [bytes, str]: init_value = bytes_to_long(init_value) - if type(auth_tag) == str: + if type(auth_tag) in [bytes, str]: auth_tag = bytes_to_long(auth_tag) if init_value >= (1 << 96): diff --git a/test.py b/test.py index c1dca80..4546b1c 100755 --- a/test.py +++ b/test.py @@ -147,16 +147,19 @@ for test_data in test_cases: test_tag = test_data['auth_tag'] - if type(test_data['auth_tag']) == str: + if type(test_data['auth_tag']) in [bytes, str]: test_tag = bytes_to_long(test_data['auth_tag']) - + test_gcm = AES_GCM(test_data['master_key']) encrypted, tag = test_gcm.encrypt( test_data['init_value'], test_data['plaintext'], test_data['auth_data'] ) - enc_dbg = '\\x' + '\\x'.join('{:02x}'.format(ord(x)) for x in encrypted) + if type(encrypted) == str: + enc_dbg = '\\x' + '\\x'.join('{:02x}'.format(ord(x)) for x in encrypted) + else: + enc_dbg = '\\x' + '\\x'.join('{:02x}'.format(x) for x in encrypted) tag_dbg = hex(tag) states = [] From 1531af33c9783954bf72baf52675195fbcad3436 Mon Sep 17 00:00:00 2001 From: asmw Date: Sun, 30 Jul 2017 12:58:31 +0200 Subject: [PATCH 5/9] Remove 'only' from README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ef0a8ce..9f9c729 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ A Python implementation of the authenticated encryption mode [Galois/Counter Mode (GCM)](http://en.wikipedia.org/wiki/Galois/Counter_Mode). -Currently it supports only 128, 192 and 256-bit AES and 96-bit nonce. +Currently it supports 128, 192 and 256-bit AES and 96-bit nonce. ## Dependencies From f3f6c5c4e0e274ff730937c90463acdfa6b5c6d7 Mon Sep 17 00:00:00 2001 From: asmw Date: Sun, 30 Jul 2017 13:01:16 +0200 Subject: [PATCH 6/9] Print the status of the correct condition for a test failure --- test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.py b/test.py index 4546b1c..4bbf223 100755 --- a/test.py +++ b/test.py @@ -201,7 +201,7 @@ print('This test case failed:') pprint(test_data) print("Encrypted: %s (%s)" % (enc_dbg, encrypted == test_data['ciphertext'])) - print("Tag: %s (%s)" % (tag_dbg, tag == test_data['auth_tag'])) + print("Tag: %s (%s)" % (tag_dbg, tag == test_tag)) print("Decrypted: %s (%s)" % (decrypted, decrypted == test_data['plaintext'])) print() From 224e9c75d197872e4f6f00f9892d75064b845c04 Mon Sep 17 00:00:00 2001 From: asmw Date: Sat, 5 Aug 2017 20:01:44 +0200 Subject: [PATCH 7/9] Initial test script to run NIST test vectors --- test_nist.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100755 test_nist.py diff --git a/test_nist.py b/test_nist.py new file mode 100755 index 0000000..7941243 --- /dev/null +++ b/test_nist.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +import fileinput +import sys +from Crypto.Util.number import long_to_bytes, bytes_to_long + +from aes_gcm import AES_GCM, InvalidTagException + +current_test_parameters = {} +current_test = {} +success_count = 0 +fail_count = 0 + +def process(line): + global current_test + global success_count + global fail_count + sline = line.strip() + if sline.startswith("["): + data = sline[1:-1] + key, value = data.split("=", 1) + current_test_parameters[key.strip()] = int(value) + elif (sline == "" and not current_test) or line.startswith("#"): + return + elif sline == "" and 'count' in current_test.keys(): + errors = [] + if 'PT' not in current_test.keys(): + current_test['PT'] = '' + test_gcm = AES_GCM(int(current_test['Key'],16)) + test_aad = b'' if (len(current_test['AAD']) == 0) else long_to_bytes(int(current_test['AAD'], 16)) + test_tag = b'' if (len(current_test['Tag']) == 0) else int(current_test['Tag'], 16) + test_crypttext = b'' if (len(current_test['CT']) == 0) else long_to_bytes(int(current_test['CT'], 16)) + test_plaintext = b'' if (len(current_test['PT']) == 0) else long_to_bytes(int(current_test['PT'], 16)) + test_iv = int(current_test['IV'], 16) + computed_crypttext, computed_tag = test_gcm.encrypt( + test_iv, + test_plaintext, + test_aad) + if computed_tag != test_tag: + errors.append("Tag mismatch after encryption") + computed_plaintext = b'' + try: + computed_plaintext = test_gcm.decrypt(test_iv, test_crypttext, test_tag, test_aad) + if computed_plaintext != test_plaintext: + errors.append("Plaintext mismatch") + except InvalidTagException: + errors.append("Tag mismatch while decrypting") + test_passed = current_test['fail'] == (len(errors) > 0) + if not test_passed: + fail_count += 1 + print("\n\nFailed test %s" % current_test['count']) + print("Parameters:") + print(current_test_parameters) + print("Test case:") + print(current_test) + print(errors) + print("Crypttext:", test_crypttext, computed_crypttext) + print("Plaintext:", test_plaintext, computed_plaintext) + print("Tags:", test_tag, computed_tag) + print("Failed: %s | Success: %s" % (fail_count, success_count)) + else: + success_count += 1 + current_test = None + elif line.startswith("Count ="): + current_test = { + 'count': int(line.split("=", 1)[1]), + 'fail': False + } + elif " = " in line: + name, value = line.split(" = ", 1) + current_test[name.strip()] = value.strip() + elif sline == "FAIL": + current_test['fail'] = True + else: + print("unknown line: %s" % line) + +print("Parsing") + +for line in fileinput.input(): + process(line) + +print("Success: %s" % success_count) +print("Failed: %s" % fail_count) From 40dff6fc8163651baa769d194bfcc4f3056f5214 Mon Sep 17 00:00:00 2001 From: asmw Date: Sat, 5 Aug 2017 22:48:41 +0200 Subject: [PATCH 8/9] Support tag length != 128 bit --- aes_gcm.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/aes_gcm.py b/aes_gcm.py index 8ae9da8..f6b7e71 100755 --- a/aes_gcm.py +++ b/aes_gcm.py @@ -119,7 +119,7 @@ def __ghash(self, aad, txt): return tag - def encrypt(self, init_value, plaintext, auth_data=b''): + def encrypt(self, init_value, plaintext, auth_data=b'', tag_len = 16): if type(init_value) in [bytes, str]: # Assume the IV is provided as bytes init_value = bytes_to_long(init_value) @@ -158,13 +158,15 @@ def encrypt(self, init_value, plaintext, auth_data=b''): # assert len(ciphertext) == len(plaintext) assert auth_tag < (1 << 128) + auth_tag = auth_tag >> (16 - tag_len) * 8 return ciphertext, auth_tag - def decrypt(self, init_value, ciphertext, auth_tag, auth_data=b''): + def decrypt(self, init_value, ciphertext, auth_tag, auth_data=b'', tag_len = 16): # Assume the IV and/or auth tag are provided as byte arrays when they look like strings if type(init_value) in [bytes, str]: init_value = bytes_to_long(init_value) if type(auth_tag) in [bytes, str]: + tag_len = int(len(auth_tag) / 8) auth_tag = bytes_to_long(auth_tag) if init_value >= (1 << 96): @@ -172,9 +174,11 @@ def decrypt(self, init_value, ciphertext, auth_tag, auth_data=b''): if auth_tag >= (1 << 128): raise InvalidInputException('Tag should be 128-bit') - if auth_tag != self.__ghash(auth_data, ciphertext) ^ \ + ghash = self.__ghash(auth_data, ciphertext) ^ \ bytes_to_long(self.__aes_ecb.encrypt( - long_to_bytes((init_value << 32) | 1, 16))): + long_to_bytes((init_value << 32) | 1, 16))) + ghash = ghash >> (16 - tag_len) * 8 + if auth_tag != ghash: raise InvalidTagException len_ciphertext = len(ciphertext) From 75b3cd9528ac8e99949fd49ff6ef0d318248152d Mon Sep 17 00:00:00 2001 From: asmw Date: Sat, 5 Aug 2017 22:49:08 +0200 Subject: [PATCH 9/9] Nicer nist test output --- test_nist.py | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/test_nist.py b/test_nist.py index 7941243..abfd6a5 100755 --- a/test_nist.py +++ b/test_nist.py @@ -31,15 +31,25 @@ def process(line): test_crypttext = b'' if (len(current_test['CT']) == 0) else long_to_bytes(int(current_test['CT'], 16)) test_plaintext = b'' if (len(current_test['PT']) == 0) else long_to_bytes(int(current_test['PT'], 16)) test_iv = int(current_test['IV'], 16) - computed_crypttext, computed_tag = test_gcm.encrypt( - test_iv, - test_plaintext, - test_aad) + tag_len = int(int(current_test_parameters['Taglen']) / 8) + try: + computed_crypttext, computed_tag = test_gcm.encrypt( + test_iv, + test_plaintext, + test_aad, + tag_len) + except ValueError as e: + errors.append(e) if computed_tag != test_tag: errors.append("Tag mismatch after encryption") computed_plaintext = b'' try: - computed_plaintext = test_gcm.decrypt(test_iv, test_crypttext, test_tag, test_aad) + computed_plaintext = test_gcm.decrypt( + test_iv, + test_crypttext, + test_tag, + test_aad, + tag_len) if computed_plaintext != test_plaintext: errors.append("Plaintext mismatch") except InvalidTagException: @@ -53,9 +63,15 @@ def process(line): print("Test case:") print(current_test) print(errors) - print("Crypttext:", test_crypttext, computed_crypttext) - print("Plaintext:", test_plaintext, computed_plaintext) - print("Tags:", test_tag, computed_tag) + print("Crypttext") + print(" Test: %s" % test_crypttext) + print(" Computed: %s" % computed_crypttext) + print("Plaintext") + print(" Test: %s" % test_plaintext) + print(" Computed: %s" % computed_plaintext) + print("Tags") + print(" Test: %s" % hex(test_tag)) + print(" Computed: %s" % hex(computed_tag)) print("Failed: %s | Success: %s" % (fail_count, success_count)) else: success_count += 1 @@ -75,8 +91,14 @@ def process(line): print("Parsing") +total = 0 +last = 0 for line in fileinput.input(): process(line) + total = success_count + fail_count + if (total % 20) == 0 and last != total: + print("Failed: %s | Success: %s" % (fail_count, success_count)) + last = total print("Success: %s" % success_count) print("Failed: %s" % fail_count)