From a6b9d5012cd6b35ed8083d2554c404205692ce4a Mon Sep 17 00:00:00 2001 From: Jimmy Song Date: Tue, 21 Nov 2023 00:51:07 -0600 Subject: [PATCH 1/2] Added experimental FROST support --- buidl/cecc.py | 3 + buidl/frost.py | 231 +++++++++++++++++++++++++++++++++++++++ buidl/pecc.py | 3 + buidl/test/test_frost.py | 49 +++++++++ 4 files changed, 286 insertions(+) create mode 100644 buidl/frost.py create mode 100644 buidl/test/test_frost.py diff --git a/buidl/cecc.py b/buidl/cecc.py index 4046854..0edb69d 100644 --- a/buidl/cecc.py +++ b/buidl/cecc.py @@ -47,6 +47,9 @@ def __init__(self, csec=None, usec=None): def __eq__(self, other): return self.sec() == other.sec() + def __hash__(self): + return hash(self.sec()) + def __repr__(self): return f"S256Point({self.sec().hex()})" diff --git a/buidl/frost.py b/buidl/frost.py new file mode 100644 index 0000000..a40037d --- /dev/null +++ b/buidl/frost.py @@ -0,0 +1,231 @@ +from secrets import randbelow + +from buidl.ecc import N, G, S256Point, SchnorrSignature +from buidl.helper import ( + big_endian_to_int, + encode_varint, + int_to_big_endian, +) +from buidl.hash import hash_challenge +from buidl.phash import tagged_hash + + +def hash_frost_keygen(m): + """Hash used for cooperative key generation. This should be a tagged hash""" + return tagged_hash(b"FROST/keygen", m) + + +def hash_frost_commitment(m): + """Hash used for message commitment in signing. This should be a tagged hash""" + return tagged_hash(b"FROST/commitment", m) + + +class FrostParticipant: + """Represents a participant in a t-of-n FROST""" + + def __init__(self, t, n, index): + # t-of-n FROST with this one being at index in [1, n] + self.t = t + self.n = n + self.index = index + self.keygen_coefficients = None + self.coefficient_commitments = [[] for _ in range(self.n)] + self.shares_from = [None for _ in range(self.n)] + + def key_generation_round_1(self, name): + if self.keygen_coefficients is not None: + raise ValueError("secrets have already been defined") + # generate t random numbers for a Shamir polynomial + self.keygen_coefficients = [randbelow(N) for _ in range(self.t)] + my_commitments = [coef * G for coef in self.keygen_coefficients] + self.coefficient_commitments[self.index] = my_commitments + k = randbelow(N) # TODO: change this to use the k generation from bip340 + r = k * G + c = hash_frost_keygen( + encode_varint(self.index) + name + my_commitments[0].xonly() + r.xonly() + ) + # proof proves that we know the first coefficient + proof = (k + self.keygen_coefficients[0] * big_endian_to_int(c)) % N + return (my_commitments, r, proof) + + def poly_value(self, x): + """return the polynomial value f(x) for the polynomial defined by the secrets""" + result = 0 + for coef_index in range(self.t): + result += self.keygen_coefficients[coef_index] * x**coef_index % N + return result % N + + def verify_round_1(self, name, participant_index, commitments, r, proof): + """check that the commitment at index 0, r and proof are valid""" + if participant_index == self.index: + return + c = hash_frost_keygen( + encode_varint(participant_index) + name + commitments[0].xonly() + r.xonly() + ) + if r != -big_endian_to_int(c) * commitments[0] + proof: + raise RuntimeError("commitment does not correspond to proof") + self.coefficient_commitments[participant_index] = commitments + + def key_generation_round_2(self): + """Deal out shares to each participant corresponding to their index + 1""" + shares = [] + for participant_index in range(self.n): + shares.append(self.poly_value(participant_index + 1)) + self.shares_from[self.index] = shares[self.index] + shares[self.index] = None + return shares + + def verify_round_2(self, participant_index, share): + """Check that we have a valid point in the committed Shamir polynomial + from this participant""" + if participant_index == self.index: + return + commitments = self.coefficient_commitments[participant_index] + x = self.index + 1 + target = share * G + points = [] + for coef_index in range(self.t): + coef = x**coef_index % N + points.append(coef * commitments[coef_index]) + if S256Point.combine(points) != target: + raise RuntimeError("share does not correspond to the commitment") + self.shares_from[participant_index] = share + + def compute_keys(self): + """Now compute the pubkeys for each participant and the secret share for + our pubkey""" + self.pubkeys = [] + for _ in range(self.n): + points = [] + for participant_index in range(self.n): + for coef_index in range(self.t): + coef = (self.index + 1) ** coef_index % N + points.append( + coef + * self.coefficient_commitments[participant_index][coef_index] + ) + self.pubkeys.append(S256Point.combine(points)) + # the constant term of the combined polynomial is the pubkey + self.group_pubkey = S256Point.combine( + [ + self.coefficient_commitments[participant_index][0] + for participant_index in range(self.n) + ] + ) + # the secret shares that were dealt to us, we now combine for the secret + self.secret = sum(self.shares_from) % N + # sanity check against the public key we computed + self.pubkey = self.pubkeys[self.index] + if self.secret * G != self.pubkey: + raise RuntimeError("something wrong with the secret") + # if we have an odd group key, negate everything + if self.group_pubkey.parity: + # negate the pubkeys, the group pubkey and our secret + self.pubkeys = [-1 * p for p in self.pubkeys] + self.group_pubkey = -1 * self.group_pubkey + self.secret = N - self.secret + self.pubkey = self.pubkeys[self.index] + return self.group_pubkey + + def generate_nonce_pairs(self, num=200): + """We now deal to everyone the nonces we will be using for signing. + Each signing requires a pair of nonces and we return the nonce commitments""" + # create two nonces for use in the signing + self.nonces = {} + self.nonce_pubs = [] + for _ in range(num): + # this should probably involve some deterministic process involving + # the private key + nonce_1, nonce_2 = randbelow(N), randbelow(N) + nonce_pub_1 = nonce_1 * G + nonce_pub_2 = nonce_2 * G + self.nonces[nonce_pub_1] = (nonce_1, nonce_2) + self.nonce_pubs.append((nonce_pub_1, nonce_pub_2)) + return self.nonce_pubs + + def register_nonce_pubs(self, nonce_pubs_list): + """When we receive the nonce commitments, we store them""" + self.nonces_available = [] + for nonce_pubs in nonce_pubs_list: + nonce_lookup = {} + for nonce_pub_1, nonce_pub_2 in nonce_pubs: + nonce_lookup[(nonce_pub_1, nonce_pub_2)] = True + self.nonces_available.append(nonce_lookup) + + def compute_group_r(self, msg, nonces_to_use): + """The R that we use for signing can be computed based on the nonces + we are using and the message that we're signing""" + # add up the first nonces as normal + ds = [] + for key in sorted(nonces_to_use.keys()): + value = nonces_to_use[key] + ds.append(value[0]) + result = [S256Point.combine(ds)] + # the second nonces need to be multiplied by the commitment + for key in sorted(nonces_to_use.keys()): + value = nonces_to_use[key] + commitment = ( + big_endian_to_int( + hash_frost_commitment( + msg + encode_varint(key) + value[0].xonly() + value[1].xonly() + ) + ) + % N + ) + result.append(commitment * value[1]) + return S256Point.combine(result) + + def sign(self, msg, nonces_to_use): + """Sign using our secret share given the nonces we are supposed to use""" + group_r = self.compute_group_r(msg, nonces_to_use) + # compute the lagrange coefficient based on the participants + lagrange = 1 + for key in sorted(nonces_to_use.keys()): + value = nonces_to_use[key] + if not self.nonces_available[key][value]: + raise ValueError("Using an unknown or already used nonce") + if key == self.index: + my_commitment = ( + big_endian_to_int( + hash_frost_commitment( + msg + + encode_varint(key) + + value[0].xonly() + + value[1].xonly() + ) + ) + % N + ) + else: + lagrange *= (key + 1) * pow(key - self.index, -1, N) % N + # the group challenge is the normal Schnorr Signature challenge from BIP340 + challenge = big_endian_to_int( + hash_challenge(group_r.xonly() + self.group_pubkey.xonly() + msg) + ) + # use the two nonces to compute the k we will use + my_d, my_e = self.nonces[nonces_to_use[self.index][0]] + my_k = my_d + my_e * my_commitment + d_pub, e_pub = my_d * G, my_e * G + my_r = S256Point.combine([d_pub, my_commitment * e_pub]) + # if the group r is odd, we negate everything + if group_r.parity: + group_r = -1 * group_r + my_k = N - my_k + my_r = -1 * my_r + sig_share = (my_k + lagrange * self.secret * challenge) % N + # sanity check the s we generated + second = (challenge * lagrange % N) * self.pubkey + if -1 * second + sig_share != my_r: + raise RuntimeError("signature didn't do what we expected") + # delete nonce used + for key in sorted(nonces_to_use.keys()): + value = nonces_to_use[key] + del self.nonces_available[key][value] + return sig_share + + def combine_shares(self, shares, msg, nonces_to_use): + """Convenience method to return a Schnorr Signature once + the participants have returned their shares""" + r = self.compute_group_r(msg, nonces_to_use) + s = sum(shares) % N + return SchnorrSignature.parse(r.xonly() + int_to_big_endian(s, 32)) diff --git a/buidl/pecc.py b/buidl/pecc.py index fdce158..1d481a3 100644 --- a/buidl/pecc.py +++ b/buidl/pecc.py @@ -231,6 +231,9 @@ def __init__(self, x, y, a=None, b=None): def __eq__(self, other): return self.x == other.x and self.y == other.y + def __hash__(self): + return hash(self.sec()) + def __repr__(self): if self.x is None: return "S256Point(infinity)" diff --git a/buidl/test/test_frost.py b/buidl/test/test_frost.py new file mode 100644 index 0000000..ea62f52 --- /dev/null +++ b/buidl/test/test_frost.py @@ -0,0 +1,49 @@ +from itertools import combinations +from unittest import TestCase + +from buidl.frost import FrostParticipant +from buidl.helper import sha256 + + +class FrostTest(TestCase): + def test_frost(self): + # create a three participant frost + tests = [ + (1, 2), + (2, 3), + (3, 5), + (4, 7), + (5, 9), + (3, 3), + (4, 8), + ] + for t, n in tests: + participants = [FrostParticipant(t, n, i) for i in range(n)] + round_1_data = [] + key_name = b"test" + for p in participants: + round_1_data.append(p.key_generation_round_1(key_name)) + for p in participants: + for i in range(n): + p.verify_round_1(key_name, i, *round_1_data[i]) + for i, p in enumerate(participants): + for j, share in enumerate(p.key_generation_round_2()): + participants[j].verify_round_2(i, share) + for p in participants: + group_pubkey = p.compute_keys() + self.assertFalse(group_pubkey.parity) + combos = combinations(participants, t) + num_nonces = len([0 for _ in combos]) + nonce_pubs = [] + for p in participants: + nonce_pubs.append(p.generate_nonce_pairs(num_nonces)) + for p in participants: + p.register_nonce_pubs(nonce_pubs) + msg = sha256(b"I am testing FROST") + for ps in combinations(participants, t): + nonces_to_use = {p.index: nonce_pubs[p.index].pop() for p in ps} + shares = [] + for p in ps: + shares.append(p.sign(msg, nonces_to_use)) + schnorr_sig = ps[0].combine_shares(shares, msg, nonces_to_use) + self.assertTrue(group_pubkey.verify_schnorr(msg, schnorr_sig)) From 62892e0d13cc0635bc11f8351b29625410cea646 Mon Sep 17 00:00:00 2001 From: Jimmy Song Date: Tue, 21 Nov 2023 14:55:20 -0600 Subject: [PATCH 2/2] WIP --- buidl/cecc.py | 9 +- buidl/frost.py | 590 +++++++++++++++++++++++++++++---------- buidl/test/test_frost.py | 144 ++++++++-- 3 files changed, 565 insertions(+), 178 deletions(-) diff --git a/buidl/cecc.py b/buidl/cecc.py index 0edb69d..34b3304 100644 --- a/buidl/cecc.py +++ b/buidl/cecc.py @@ -402,9 +402,11 @@ def sign(self, z): raise RuntimeError("generated signature doesn't verify") return sig - def sign_schnorr(self, msg, aux): + def sign_schnorr(self, msg, aux=None): if len(msg) != 32: raise ValueError("msg needs to be 32 bytes") + if aux is None: + aux = b"\x00" * 32 if len(aux) != 32: raise ValueError("aux needs to be 32 bytes") # per libsecp256k1 documentation, this helps against side-channel attacks @@ -421,7 +423,10 @@ def sign_schnorr(self, msg, aux): raw_sig = ffi.new("unsigned char [64]") if not lib.secp256k1_schnorrsig_sign(GLOBAL_CTX, raw_sig, msg, keypair, aux): raise RuntimeError("libsecp256k1 schnorr signing problem") - return SchnorrSignature(bytes(ffi.buffer(raw_sig, 64))) + schnorr = SchnorrSignature(bytes(ffi.buffer(raw_sig, 64))) + if not self.point.verify_schnorr(msg, schnorr): + raise RuntimeError("Bad Signature") + return schnorr def deterministic_k(self, z): k = b"\x00" * 32 diff --git a/buidl/frost.py b/buidl/frost.py index a40037d..1141d60 100644 --- a/buidl/frost.py +++ b/buidl/frost.py @@ -1,6 +1,6 @@ from secrets import randbelow -from buidl.ecc import N, G, S256Point, SchnorrSignature +from buidl.ecc import N, G, S256Point, PrivateKey, SchnorrSignature from buidl.helper import ( big_endian_to_int, encode_varint, @@ -20,212 +20,494 @@ def hash_frost_commitment(m): return tagged_hash(b"FROST/commitment", m) +class UpdatePrivatePolynomial: + """Private Polynomial for updating keys. Same as PrivatePolynomial + except it doesn't have a constant term.""" + + def __init__(self, coefficients): + self.coefficients = coefficients + points = [s * G for s in self.coefficients] + self.public = UpdatePublicPolynomial(points) + + def y_value(self, x): + """return the y value y = f(x) where f is the private polynomial""" + result = 0 + # compute y = a_1 * x + a_2 * x^2 + ... + a_(t-1) * x^(t-1) + for coef_index, coef in enumerate(self.coefficients): + result += coef * x ** (coef_index + 1) % N + return result % N + + @classmethod + def generate(cls, t): + return cls([randbelow(N) for _ in range(t - 1)]) + + +class UpdatePublicPolynomial: + """Pulic Polynomial for updating keys. Same as PublicPolynomial + except it doesn't have a constant term.""" + + def __init__(self, points): + self.points = points + + def __repr__(self): + return "\n".join([p.__repr__() for p in self.points]) + + def y_value(self, x): + """return the y value y = f(x) where f is the public polynomial""" + to_sum = [] + # compute y = A_1 * x + A_2 * x^2 + ... + A_(t-1) * x^(t-1) + for coef_index, point in enumerate(self.points): + to_sum.append((x ** (coef_index + 1) % N) * self.points[coef_index]) + return S256Point.combine(to_sum) + + +class PrivatePolynomial: + """Polynomial with scalar coefficients. We can combine many of these + to create a polynomial for Shamir's Secret Sharing.""" + + def __init__(self, coefficients): + self.coefficients = coefficients + # the constant term is the "private key" for this polynomial + self.private_key = PrivateKey(coefficients[0]) + # we compute the corresponding PublicPolynomial which have ECC points + # as coefficients + points = [s * G for s in self.coefficients] + self.public = PublicPolynomial(points) + + def y_value(self, x): + """return the y value y = f(x) where f is the private polynomial""" + result = 0 + # compute y = a_0 + a_1 * x + a_2 * x^2 + ... + a_(t-1) * x^(t-1) + for coef_index, coef in enumerate(self.coefficients): + result += coef * x**coef_index % N + return result % N + + def sign_keygen(self, msg): + """Sign a message to prove that the private key to the public point + is in our posession""" + return self.private_key.sign_schnorr(msg) + + @classmethod + def generate(cls, t): + return cls([randbelow(N) for _ in range(t)]) + + @classmethod + def from_hd(cls, t, hd_priv): + """Get coefficients from the hardened children of a HDPrivateKey""" + return cls([hd_priv.child((1 << 31) + i).private_key.secret for i in range(t)]) + + +class PublicPolynomial: + """Polynomial with ECC Point coefficients. We can combine many of these + to create a public key for the shared secret from Shamir.""" + + def __init__(self, points): + self.points = points + # the constant term of the polynomial is the public key + self.public_key = points[0] + + def __repr__(self): + return "\n".join([p.__repr__() for p in self.points]) + + def y_value(self, x): + """return the y value y = f(x) where f is the public polynomial""" + to_sum = [] + # compute y = A_0 + A_1 * x + A_2 * x^2 + ... + A_(t-1) * x^(t-1) + for coef_index, point in enumerate(self.points): + to_sum.append((x**coef_index % N) * self.points[coef_index]) + return S256Point.combine(to_sum) + + def verify_keygen(self, msg, sig): + return self.public_key.verify_schnorr(msg, sig) + + class FrostParticipant: """Represents a participant in a t-of-n FROST""" - def __init__(self, t, n, index): - # t-of-n FROST with this one being at index in [1, n] - self.t = t - self.n = n - self.index = index - self.keygen_coefficients = None - self.coefficient_commitments = [[] for _ in range(self.n)] - self.shares_from = [None for _ in range(self.n)] + def __init__( + self, + t, + participants, + x, + hd_priv=None, + secret=None, + pubkeys=None, + group_pubkey=None, + ): + self.participants = participants[:] + self.n = len(participants) + if t > self.n: + raise ValueError("t should be less than or equal to n") + # t-of-n FROST + self.t = t # threshold of participants + self.x = x # this participant's x coordinate + self.hd_priv = hd_priv # HDPrivateKey object for generating + self.secret = secret # this participant's secret, or y coordinate + self.pubkeys = pubkeys # the pubkeys of other participants + if secret: + self.pubkey = self.secret * G + self.group_pubkey = group_pubkey # the combined group pubkey + # nonces that other participants have registered with us + self.nonces_available = {x: {} for x in participants} + # these are used in distributed key generation and distributed key + # update. + self.private_polynomial = None + self.public_polynomials = None + self.my_shares = None def key_generation_round_1(self, name): - if self.keygen_coefficients is not None: + """We generate a polynomial which will be combined with other + participants to generate the Shamir Secret Sharing polynomial""" + if self.private_polynomial is not None: raise ValueError("secrets have already been defined") - # generate t random numbers for a Shamir polynomial - self.keygen_coefficients = [randbelow(N) for _ in range(self.t)] - my_commitments = [coef * G for coef in self.keygen_coefficients] - self.coefficient_commitments[self.index] = my_commitments - k = randbelow(N) # TODO: change this to use the k generation from bip340 - r = k * G - c = hash_frost_keygen( - encode_varint(self.index) + name + my_commitments[0].xonly() + r.xonly() - ) - # proof proves that we know the first coefficient - proof = (k + self.keygen_coefficients[0] * big_endian_to_int(c)) % N - return (my_commitments, r, proof) - - def poly_value(self, x): - """return the polynomial value f(x) for the polynomial defined by the secrets""" - result = 0 - for coef_index in range(self.t): - result += self.keygen_coefficients[coef_index] * x**coef_index % N - return result % N + # generate a private Shamir polynomial + if self.hd_priv: + self.private_polynomial = PrivatePolynomial.from_hd(self.t, self.hd_priv) + else: + self.private_polynomial = PrivatePolynomial.generate(self.t) + # the share that we generated for ourselves needs to be registered + self.my_shares = {self.x: self.private_polynomial.y_value(self.x)} + self.public_polynomials = {self.x: self.private_polynomial.public} + # sign with the first coefficient as private key, our x and name context + msg = hash_frost_keygen(encode_varint(self.x) + name) + schnorr_sig = self.private_polynomial.sign_keygen(msg) + return (self.private_polynomial.public, schnorr_sig) - def verify_round_1(self, name, participant_index, commitments, r, proof): - """check that the commitment at index 0, r and proof are valid""" - if participant_index == self.index: + def verify_round_1(self, name, participant_x, public_polynomial, schnorr_sig): + """store the public polynomial for the participant and check the + signature provided for the constant term""" + if participant_x == self.x: return - c = hash_frost_keygen( - encode_varint(participant_index) + name + commitments[0].xonly() + r.xonly() - ) - if r != -big_endian_to_int(c) * commitments[0] + proof: - raise RuntimeError("commitment does not correspond to proof") - self.coefficient_commitments[participant_index] = commitments - - def key_generation_round_2(self): - """Deal out shares to each participant corresponding to their index + 1""" - shares = [] - for participant_index in range(self.n): - shares.append(self.poly_value(participant_index + 1)) - self.shares_from[self.index] = shares[self.index] - shares[self.index] = None - return shares - - def verify_round_2(self, participant_index, share): - """Check that we have a valid point in the committed Shamir polynomial - from this participant""" - if participant_index == self.index: + # check the signature + msg = hash_frost_keygen(encode_varint(participant_x) + name) + if not public_polynomial.verify_keygen(msg, schnorr_sig): + raise RuntimeError("signature for round 1 does not verify") + # register the polynomial for combining + self.public_polynomials[participant_x] = public_polynomial + + def key_generation_round_2(self, participant_x): + """Deal out share to a fellow participant. This gives the y value + for our polynomial. When combined with the y values of other polynomials + the participant will have the y value for the combined group polynomial. + A threshold of such y values can be used to reconstruct the secret""" + return self.private_polynomial.y_value(participant_x) + + def verify_round_2(self, participant_x, share): + """Receive the share from a fellow participant and check that this share + corresponds to the point in the public polynomial they committed to.""" + if participant_x == self.x: return - commitments = self.coefficient_commitments[participant_index] - x = self.index + 1 - target = share * G - points = [] - for coef_index in range(self.t): - coef = x**coef_index % N - points.append(coef * commitments[coef_index]) - if S256Point.combine(points) != target: + public_polynomial = self.public_polynomials[participant_x] + # check our share against the commitments this participant has made + pubkey = public_polynomial.y_value(self.x) + # the result should correspond to the share we got + if pubkey != share * G: raise RuntimeError("share does not correspond to the commitment") - self.shares_from[participant_index] = share + # share is valid, we store it for later processing + self.my_shares[participant_x] = share def compute_keys(self): - """Now compute the pubkeys for each participant and the secret share for - our pubkey""" - self.pubkeys = [] - for _ in range(self.n): - points = [] - for participant_index in range(self.n): - for coef_index in range(self.t): - coef = (self.index + 1) ** coef_index % N - points.append( - coef - * self.coefficient_commitments[participant_index][coef_index] - ) - self.pubkeys.append(S256Point.combine(points)) - # the constant term of the combined polynomial is the pubkey + """Now compute the pubkeys for each participant and the y value to + the group polynomial, which is our secret""" + self.pubkeys = {} + # we go through each participant to compute their pubkeys + # this is done by adding up the y values at x for every public polynomial + # whose sum is the group public polynomial + for x in self.participants: + points = [p.y_value(x) for p in self.public_polynomials.values()] + self.pubkeys[x] = S256Point.combine(points) + # the constant term of the combined polynomial is the group pubkey self.group_pubkey = S256Point.combine( - [ - self.coefficient_commitments[participant_index][0] - for participant_index in range(self.n) - ] + [p.public_key for p in self.public_polynomials.values()] ) + # delete the public polynomials and our private polynomial + self.public_polynomials = None + self.private_polynomial = None # the secret shares that were dealt to us, we now combine for the secret - self.secret = sum(self.shares_from) % N + self.secret = sum(self.my_shares.values()) % N + # delete the shares + self.my_shares = None # sanity check against the public key we computed - self.pubkey = self.pubkeys[self.index] + self.pubkey = self.pubkeys[self.x] if self.secret * G != self.pubkey: raise RuntimeError("something wrong with the secret") - # if we have an odd group key, negate everything + # if we have an odd group key, negate everything to make the x-only + # value good if self.group_pubkey.parity: # negate the pubkeys, the group pubkey and our secret - self.pubkeys = [-1 * p for p in self.pubkeys] + self.pubkeys = {x: -1 * p for x, p in self.pubkeys.items()} self.group_pubkey = -1 * self.group_pubkey self.secret = N - self.secret - self.pubkey = self.pubkeys[self.index] + self.pubkey = self.pubkeys[self.x] return self.group_pubkey def generate_nonce_pairs(self, num=200): - """We now deal to everyone the nonces we will be using for signing. - Each signing requires a pair of nonces and we return the nonce commitments""" - # create two nonces for use in the signing + """We now deal to everyone the nonces we will be using for signing. Each + signing requires a pair of nonces and we return the nonce commitments""" self.nonces = {} self.nonce_pubs = [] for _ in range(num): # this should probably involve some deterministic process involving - # the private key + # some commitments to the group of participants nonce_1, nonce_2 = randbelow(N), randbelow(N) nonce_pub_1 = nonce_1 * G nonce_pub_2 = nonce_2 * G self.nonces[nonce_pub_1] = (nonce_1, nonce_2) self.nonce_pubs.append((nonce_pub_1, nonce_pub_2)) + return self.nonce_pubs[:] + + def extract_nonce_pairs(self, hd_priv, num=200): + """We generate the nonce pairs using an HDPrivateKey as an alternative + to random generation""" + # create two nonces for use in the signing + self.nonces = {} + self.nonce_pubs = [] + for i in range(num): + # use a hardened derivation + child = hd_priv.child((1 << 31) + i) + nonce_1, nonce_2 = ( + child.child(0).private_key.secret, + child.child(1).private_key.secret, + ) + nonce_pub_1 = nonce_1 * G + nonce_pub_2 = nonce_2 * G + self.nonces[nonce_pub_1] = (nonce_1, nonce_2) + self.nonce_pubs.append((nonce_pub_1, nonce_pub_2)) return self.nonce_pubs - def register_nonce_pubs(self, nonce_pubs_list): + def register_nonces(self, x, nonce_pubs): """When we receive the nonce commitments, we store them""" - self.nonces_available = [] - for nonce_pubs in nonce_pubs_list: - nonce_lookup = {} - for nonce_pub_1, nonce_pub_2 in nonce_pubs: - nonce_lookup[(nonce_pub_1, nonce_pub_2)] = True - self.nonces_available.append(nonce_lookup) + nonce_lookup = {} + for nonce_pub_1, nonce_pub_2 in nonce_pubs: + nonce_lookup[(nonce_pub_1, nonce_pub_2)] = True + self.nonces_available[x] = nonce_lookup + + def compute_commitment(self, x, msg, d, e): + """Commitment is what we use to make the k we use deterministic in a way + that's not manipulable by the other players ahead of time""" + h = hash_frost_commitment(msg + encode_varint(x) + d.xonly() + e.xonly()) + return big_endian_to_int(h) % N + + def compute_partial_r(self, x, msg, d, e): + """Return the R_x, or the target for the participant at x""" + commitment = self.compute_commitment(x, msg, d, e) + return S256Point.combine([d, commitment * e]) def compute_group_r(self, msg, nonces_to_use): - """The R that we use for signing can be computed based on the nonces - we are using and the message that we're signing""" - # add up the first nonces as normal - ds = [] - for key in sorted(nonces_to_use.keys()): - value = nonces_to_use[key] - ds.append(value[0]) - result = [S256Point.combine(ds)] - # the second nonces need to be multiplied by the commitment - for key in sorted(nonces_to_use.keys()): - value = nonces_to_use[key] - commitment = ( - big_endian_to_int( - hash_frost_commitment( - msg + encode_varint(key) + value[0].xonly() + value[1].xonly() - ) - ) - % N - ) - result.append(commitment * value[1]) + """The R that we use for signing is the sum of all the R_x's + from the participants""" + result = [] + participants = sorted(nonces_to_use.keys()) + for x in participants: + d, e = nonces_to_use[x] + partial_r = self.compute_partial_r(x, msg, d, e) + result.append(partial_r) return S256Point.combine(result) - def sign(self, msg, nonces_to_use): + def lagrange_coefficient(self, x, participants): + """This calculates the value of the lagrange interpolating polynomial + at 0. Multiplied by the secret, this represents the participant's + additive portion of the group secret.""" + result = 1 + # compute Σ(p_x/(p_x-x)) where p_x != x + for p_x in participants: + if p_x != x: + result *= p_x * pow(p_x - x, N - 2, N) % N + return result + + def lagrange_y_value(self, p_xs, new_participant_x): + """This calculates the value of the lagrange interpolating polynomial + at x for this participant's x. If collected from all participants, a + new FrostParticipant can be added. But we can't just send the value + directly as it will reveal our secret, so it'll subsequently be split""" + result = 1 + for p_x in p_xs: + if p_x != self.x: + result *= (new_participant_x - p_x) * pow(self.x - p_x, N - 2, N) % N + return result * self.secret % N + + def sign(self, msg, nonces_to_use, tweak=None): """Sign using our secret share given the nonces we are supposed to use""" group_r = self.compute_group_r(msg, nonces_to_use) # compute the lagrange coefficient based on the participants - lagrange = 1 - for key in sorted(nonces_to_use.keys()): - value = nonces_to_use[key] - if not self.nonces_available[key][value]: - raise ValueError("Using an unknown or already used nonce") - if key == self.index: - my_commitment = ( - big_endian_to_int( - hash_frost_commitment( - msg - + encode_varint(key) - + value[0].xonly() - + value[1].xonly() - ) - ) - % N - ) + participants = sorted(nonces_to_use.keys()) + lagrange = self.lagrange_coefficient(self.x, participants) + # use the two nonces to compute the k we will use + d, e = nonces_to_use[self.x] + my_commitment = self.compute_commitment(self.x, msg, d, e) + my_d, my_e = self.nonces[nonces_to_use[self.x][0]] + my_k = my_d + my_e * my_commitment + my_r = self.compute_partial_r(self.x, msg, d, e) + # adjust the group pubkey, our secret and pubkey by the amount of the tweak + if tweak: + t = big_endian_to_int(tweak) + group_pubkey = self.group_pubkey + t + # if the tweaked pubkey is odd, we negate the secret and pubkey + if group_pubkey.parity: + secret = N - (self.secret + t) + pubkey = -1 * (self.pubkey + t) else: - lagrange *= (key + 1) * pow(key - self.index, -1, N) % N + secret = self.secret + t + pubkey = self.pubkey + t + else: + group_pubkey = self.group_pubkey + secret = self.secret + pubkey = self.pubkey # the group challenge is the normal Schnorr Signature challenge from BIP340 challenge = big_endian_to_int( - hash_challenge(group_r.xonly() + self.group_pubkey.xonly() + msg) + hash_challenge(group_r.xonly() + group_pubkey.xonly() + msg) ) - # use the two nonces to compute the k we will use - my_d, my_e = self.nonces[nonces_to_use[self.index][0]] - my_k = my_d + my_e * my_commitment - d_pub, e_pub = my_d * G, my_e * G - my_r = S256Point.combine([d_pub, my_commitment * e_pub]) - # if the group r is odd, we negate everything + # if the group r is odd, we negate the k and r if group_r.parity: - group_r = -1 * group_r my_k = N - my_k my_r = -1 * my_r - sig_share = (my_k + lagrange * self.secret * challenge) % N + working_secret = lagrange * secret + working_pubkey = lagrange * pubkey + # this is the partial signature, which added with a threshold number of + # participants creates the signature as would be produced by the group + # secret, which no one knows, and which validate using the group pubkey + sig_share = (my_k + working_secret * challenge) % N # sanity check the s we generated - second = (challenge * lagrange % N) * self.pubkey - if -1 * second + sig_share != my_r: - raise RuntimeError("signature didn't do what we expected") + commitment = challenge * working_pubkey + if -1 * commitment + sig_share != my_r: + raise RuntimeError("signature didn't validate") # delete nonce used - for key in sorted(nonces_to_use.keys()): - value = nonces_to_use[key] - del self.nonces_available[key][value] + for participant_x in participants: + nonce = nonces_to_use[participant_x] + del self.nonces_available[participant_x][nonce] return sig_share - def combine_shares(self, shares, msg, nonces_to_use): + def combine_sig_shares(self, sig_shares, msg, nonces_to_use, tweak=None): """Convenience method to return a Schnorr Signature once - the participants have returned their shares""" - r = self.compute_group_r(msg, nonces_to_use) - s = sum(shares) % N - return SchnorrSignature.parse(r.xonly() + int_to_big_endian(s, 32)) + the participants have returned their sig_shares""" + group_r = self.compute_group_r(msg, nonces_to_use) + if tweak: + t = big_endian_to_int(tweak) + group_pubkey = self.group_pubkey + t + else: + group_pubkey = self.group_pubkey + challenge = big_endian_to_int( + hash_challenge(group_r.xonly() + group_pubkey.xonly() + msg) + ) + # check that the sig_shares from each participant validates + participants = sorted(sig_shares.keys()) + for participant_x in participants: + lagrange = self.lagrange_coefficient(participant_x, participants) + sig_share = sig_shares[participant_x] + d, e = nonces_to_use[participant_x] + if tweak: + if group_pubkey.parity: + pubkey = -1 * (self.pubkeys[participant_x] + t) + else: + pubkey = self.pubkeys[participant_x] + t + else: + pubkey = self.pubkeys[participant_x] + working_pubkey = lagrange * pubkey + partial_r = self.compute_partial_r(participant_x, msg, d, e) + commitment = challenge * working_pubkey + if group_r.parity: + partial_r = -1 * partial_r + if -1 * commitment + sig_share != partial_r: + raise RuntimeError("share didn't validate") + # combine now + s = sum(sig_shares.values()) % N + return SchnorrSignature.parse(group_r.xonly() + int_to_big_endian(s, 32)) + + def enrolment_round_1(self, participant_xs, new_participant_x): + """enrolment is the act of adding a new participant, which turns + t-of-n to t-of-n+1.""" + # we calculate the value we need to send to the new participant + y_value = self.lagrange_y_value(participant_xs, new_participant_x) + # we split the y value into N shares first so the new participant + # can't derive our secret + self.share_of_shares = {x: randbelow(N) for x in participant_xs[:-1]} + last_value = (y_value - sum(self.share_of_shares.values())) % N + self.share_of_shares[participant_xs[-1]] = last_value + self.enrolment_round_1_values = {} + + def enrolment_round_1_send(self, x): + """We communicate the share to the appropriate participant""" + return self.share_of_shares.get(x) + + def enrolment_round_1_receive(self, x, value): + """We receive our share of the new participant's secret from x""" + self.enrolment_round_1_values[x] = value + + def enrolment_round_2_send(self): + """We now send our accumulated shares of the new participant's + y-value""" + return sum(self.enrolment_round_1_values.values()) % N + + def add_participant(self, x, pubkey, nonce_pubs): + """Add a new participant (make into n+1)""" + self.participants.append(x) + self.n = len(self.participants) + self.pubkeys[x] = pubkey + self.register_nonces(x, nonce_pubs) + + def remove_participant(self, x): + """Remove an existing participant (make into n-1)""" + self.participants.remove(x) + self.n = len(self.participants) + del self.pubkeys[x] + del self.nonces_available[x] + + def polynomial(self): + return self.private_polynomial.public + + def key_update_round_1(self): + """Once you've removed a participant, it's important to update the + polynomial so that we invalidate the removed participant's share""" + if self.private_polynomial is not None: + raise ValueError("secrets have already been defined") + # generate a private Shamir polynomial without a constant term + self.private_polynomial = UpdatePrivatePolynomial.generate(self.t) + # the share that we generated for ourselves needs to be registered + self.my_shares = {self.x: self.private_polynomial.y_value(self.x)} + self.public_polynomials = {self.x: self.polynomial()} + + def key_update_round_1_register(self, participant_x, public_polynomial): + """store the update public polynomial for the participant at x""" + if participant_x == self.x: + return + self.public_polynomials[participant_x] = public_polynomial + + def key_update_round_2(self, participant_x): + """Deal out share to a fellow participant""" + return self.private_polynomial.y_value(participant_x) + + def key_update_round_2_register(self, x, share): + """Register our share from x so we can add them together later""" + if x == self.x: + return + # check our share against the commitments this participant has made + pubkey = self.public_polynomials[x].y_value(self.x) + # the result should correspond to the share we got + if pubkey != share * G: + raise RuntimeError("share does not correspond to the commitment") + # share is valid, we store it for later processing + self.my_shares[x] = share + + def update_keys(self): + """Now update the pubkeys for each participant and the secret share for + our pubkey""" + # we go through each participant to compute their pubkeys + # this is done by adding up the y values at x for every public polynomial + # whose sum is the group public polynomial + if self.secret * G != self.pubkey: + raise RuntimeError("something wrong with the secret") + # update everyone's pubkeys by the amount of the update polynomials + for x in self.participants: + points = [p.y_value(x) for p in self.public_polynomials.values()] + self.pubkeys[x] = S256Point.combine([self.pubkeys[x]] + points) + # delete the public polynomials and our private polynomial as we don't need them + self.public_polynomials = None + self.private_polynomial = None + # the secret shares that were dealt to us, we now combine to add to the secret + self.secret = (self.secret + sum(self.my_shares.values())) % N + # delete the shares as we now don't need them + self.my_shares = None + # sanity check against the public key we computed + self.pubkey = self.pubkeys[self.x] + if self.secret * G != self.pubkey: + raise RuntimeError("something wrong with the secret") diff --git a/buidl/test/test_frost.py b/buidl/test/test_frost.py index ea62f52..6a9df1b 100644 --- a/buidl/test/test_frost.py +++ b/buidl/test/test_frost.py @@ -1,49 +1,149 @@ from itertools import combinations from unittest import TestCase +from buidl.ecc import N from buidl.frost import FrostParticipant -from buidl.helper import sha256 +from buidl.hd import HDPrivateKey +from buidl.tx import Tx, TxIn, TxOut class FrostTest(TestCase): def test_frost(self): # create a three participant frost tests = [ - (1, 2), + # (1, 2), + (3, 3), (2, 3), (3, 5), - (4, 7), - (5, 9), - (3, 3), (4, 8), + (5, 9), ] for t, n in tests: - participants = [FrostParticipant(t, n, i) for i in range(n)] - round_1_data = [] + p_xs = [x for x in range(1, n + 1)] + participants = [] + for x in p_xs: + hd_priv = HDPrivateKey.from_mnemonic( + "oil oil oil oil oil oil oil oil oil oil oil oil", + password=f"FROST test {t}-of-{n} at {x}".encode("ascii"), + network="signet", + ) + participants.append(FrostParticipant(t, p_xs, x, hd_priv=hd_priv)) + round_1_data = {} key_name = b"test" for p in participants: - round_1_data.append(p.key_generation_round_1(key_name)) + round_1_data[p.x] = p.key_generation_round_1(key_name) for p in participants: - for i in range(n): - p.verify_round_1(key_name, i, *round_1_data[i]) - for i, p in enumerate(participants): - for j, share in enumerate(p.key_generation_round_2()): - participants[j].verify_round_2(i, share) + for x in p_xs: + p.verify_round_1(key_name, x, *round_1_data[x]) + for p1 in participants: + for p2 in participants: + share = p1.key_generation_round_2(p2.x) + p2.verify_round_2(p1.x, share) for p in participants: group_pubkey = p.compute_keys() self.assertFalse(group_pubkey.parity) combos = combinations(participants, t) num_nonces = len([0 for _ in combos]) - nonce_pubs = [] + num_nonces += num_nonces * n // (n - t + 1) + nonce_pubs_lookup = {} + for p in participants: + hd_priv = HDPrivateKey.from_mnemonic( + "oil oil oil oil oil oil oil oil oil oil oil oil", + password=f"FROST test {t}-of-{n} at {p.x} nonces".encode("ascii"), + network="signet", + ) + nonce_pubs_lookup[p.x] = p.extract_nonce_pairs(hd_priv, num_nonces) + for p in participants: + for x, nonce_pubs in nonce_pubs_lookup.items(): + p.register_nonces(x, nonce_pubs) + tweak = group_pubkey.tweak() + tweaked_pubkey = group_pubkey.tweaked_key() + for ps in combinations(participants, t): + prev_tx = bytes.fromhex( + "66ee1cd94dde93df1b765f6ba5eecb74a5e9d14f901ae85c3e87a0645fc96bad" + ) + prev_index = 0 + tx_in = TxIn(prev_tx, prev_index) + script_pubkey = group_pubkey.p2tr_script(tweak=tweak) + tx_in._value = 10000 + tx_in._script_pubkey = script_pubkey + amount = 9000 + tx_out = TxOut(amount, script_pubkey) + tx_obj = Tx(1, [tx_in], [tx_out], 0, segwit=True, network="signet") + msg = tx_obj.sig_hash_bip341(0) + nonces_to_use = {p.x: nonce_pubs_lookup[p.x].pop() for p in ps} + sig_shares = {} + for p in ps: + sig_shares[p.x] = p.sign(msg, nonces_to_use, tweak) + schnorr_sig = ps[0].combine_sig_shares( + sig_shares, msg, nonces_to_use, tweak + ) + self.assertTrue(tweaked_pubkey.verify_schnorr(msg, schnorr_sig)) + tx_obj.tx_ins[0].finalize_p2tr_keypath(schnorr_sig.serialize()) + self.assertTrue(tx_obj.verify()) + new_x = n + 1 + secret = 0 + p_xs = [x for x in range(1, t + 1)] + for p in participants[:t]: + p.enrolment_round_1(p_xs, new_x) + for p in participants[:t]: + for p2 in participants[:t]: + val = p.enrolment_round_1_send(p2.x) + p2.enrolment_round_1_receive(p.x, val) + for p in participants[:t]: + secret = (secret + p.enrolment_round_2_send()) % N + new_participant = FrostParticipant( + t, p_xs + [new_x], new_x, secret=secret, group_pubkey=group_pubkey + ) + hd_priv = HDPrivateKey.from_mnemonic( + "oil oil oil oil oil oil oil oil oil oil oil oil", + password=f"FROST test {t}-of-{n} at {n+1} nonces".encode("ascii"), + ) + new_nonces = new_participant.extract_nonce_pairs(hd_priv, num_nonces) + nonce_pubs_lookup[new_x] = new_nonces + for p in participants: + p.add_participant(new_x, new_participant.pubkey, new_nonces) + pubkeys = participants[0].pubkeys.copy() + new_participant.pubkeys = pubkeys.copy() + for x, nonce_pubs in nonce_pubs_lookup.items(): + new_participant.register_nonces(x, nonce_pubs) + participants.append(new_participant) + for p in participants[1:]: + p.remove_participant(1) + participants = participants[1:] + for p in participants: + p.key_update_round_1() + for p in participants: + polynomial = p.polynomial() + for p2 in participants: + p2.key_update_round_1_register(p.x, polynomial) for p in participants: - nonce_pubs.append(p.generate_nonce_pairs(num_nonces)) + for p2 in participants: + share = p.key_update_round_2(p2.x) + p2.key_update_round_2_register(p.x, share) for p in participants: - p.register_nonce_pubs(nonce_pubs) - msg = sha256(b"I am testing FROST") + p.update_keys() for ps in combinations(participants, t): - nonces_to_use = {p.index: nonce_pubs[p.index].pop() for p in ps} - shares = [] + print([p.x for p in ps]) + prev_tx = bytes.fromhex( + "66ee1cd94dde93df1b765f6ba5eecb74a5e9d14f901ae85c3e87a0645fc96bad" + ) + prev_index = 0 + tx_in = TxIn(prev_tx, prev_index) + script_pubkey = group_pubkey.p2tr_script(tweak=tweak) + tx_in._value = 10000 + tx_in._script_pubkey = script_pubkey + amount = 9000 + tx_out = TxOut(amount, script_pubkey) + tx_obj = Tx(1, [tx_in], [tx_out], 0, segwit=True, network="signet") + msg = tx_obj.sig_hash_bip341(0) + nonces_to_use = {p.x: nonce_pubs_lookup[p.x].pop() for p in ps} + sig_shares = {} for p in ps: - shares.append(p.sign(msg, nonces_to_use)) - schnorr_sig = ps[0].combine_shares(shares, msg, nonces_to_use) - self.assertTrue(group_pubkey.verify_schnorr(msg, schnorr_sig)) + sig_shares[p.x] = p.sign(msg, nonces_to_use, tweak) + schnorr_sig = ps[0].combine_sig_shares( + sig_shares, msg, nonces_to_use, tweak + ) + self.assertTrue(tweaked_pubkey.verify_schnorr(msg, schnorr_sig)) + tx_obj.tx_ins[0].finalize_p2tr_keypath(schnorr_sig.serialize()) + self.assertTrue(tx_obj.verify())