From a911008af18b009ee2237fbf8437be1802c31d3c Mon Sep 17 00:00:00 2001 From: Casey Colley Date: Tue, 21 Oct 2025 17:57:12 -0700 Subject: [PATCH 1/3] improvements to three LRA algorithms added convergence steps, implemented proper broadcasting where appropriate, removed hardcoded values for e.g. initializing arrays, removed hardcoded model and sbox definition, now uses configurable models, generalized variable names towards model positions, in-lined various functions into LRA() classes, removed self.samples_range option (prefer container.slab), added option for bias bit to all algos, added normalize option to lra.py, added new classes to model_values to support these changes: MultiModel to calculate against several models at once, Sbox (e.g. instead of SboxWeight), ModelBits to use unpacked model output, Added __len__ method to ModelValue which abstracts multi_model --- src/scarr/engines/lra.py | 223 ++++++++++------------ src/scarr/engines/lra_i.py | 249 ++++++++++--------------- src/scarr/engines/lra_wht.py | 258 ++++++++++++-------------- src/scarr/model_values/model_bits.py | 53 ++++++ src/scarr/model_values/model_value.py | 3 + src/scarr/model_values/multi_model.py | 47 +++++ src/scarr/model_values/sbox.py | 36 ++++ src/scarr/model_values/utils.py | 5 +- 8 files changed, 456 insertions(+), 418 deletions(-) create mode 100644 src/scarr/model_values/model_bits.py create mode 100644 src/scarr/model_values/multi_model.py create mode 100644 src/scarr/model_values/sbox.py diff --git a/src/scarr/engines/lra.py b/src/scarr/engines/lra.py index 76f6fa0..6da2236 100644 --- a/src/scarr/engines/lra.py +++ b/src/scarr/engines/lra.py @@ -5,12 +5,14 @@ # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. + from .engine import Engine +from ..model_values.model_value import ModelValue +from ..model_values.model_bits import ModelBits from multiprocessing.pool import Pool import numpy as np import os -import matplotlib.pyplot as plt # Class to compute average traces class AverageTraces: @@ -31,163 +33,128 @@ def get_data(self): avdata_snap = np.flatnonzero(self.counters) avtraces_snap = self.avtraces[avdata_snap] return avdata_snap, avtraces_snap - - -# Function to compute S-box output -def s_box_out(data, key_byte, sbox): - s_box_in = data ^ key_byte - return sbox[s_box_in] - - -# Linear regression analysis (LRA) for each byte of the key -def lra(data, traces, intermediate_fkt, sbox, sst): - num_traces, trace_length = traces.shape - SSR = np.empty((256, trace_length)) - all_coefs = np.empty((256, 9, trace_length)) - - for key_byte in np.arange(256, dtype='uint8'): - intermediate_var = intermediate_fkt(data, key_byte, sbox) - M = np.array(list(map(wrapper(8), intermediate_var))) - - P = (np.linalg.inv(M.T @ M)) @ M.T - beta = P @ traces - E = M @ beta - - SSR[key_byte] = np.sum((E - traces) **2 , axis=0) - - all_coefs[key_byte,:] = beta[:] - - R2 = 1 - SSR / sst[None, :] - return R2, all_coefs - - -# Function to generate a model of bits -def model_single_bits(x, bit_width): - model = [] - for i in range(0, bit_width): - bit = (x >> i) & 1 - model.append(bit) - model.append(1) - return model +class LRA(Engine): + def __init__(self, model_value: ModelValue, convergence_step=None, normalize=False, bias=True) -> None: + self.normalize = normalize + self.convergence_step = convergence_step + self.R2s = None + self.betas = None + self.results = [] -# Wrapper function to map a model to an intermediate variable -def wrapper(y): - def curry(x): - return model_single_bits(x, y) - return curry + super().__init__(ModelBits(model_value, bias)) + def get_R2s(self): + return self.R2s -# Function to plot traces -def plot_traces(traces, nb): - for i in range(nb): - plt.plot(traces[i]) - plt.show() + def get_betas(self): + return self.betas + def get_results(self): + return self.results -def fixed_sst(traces): - num_traces, trace_length = traces.shape - u = np.zeros(trace_length) - v = np.zeros(trace_length) - for i in range(num_traces): - u += traces[i] - v += traces[i]**2 - return np.subtract(v,np.square(u)/num_traces) + def populate(self, container): + model_vals = self.model_value.model.num_vals + num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 + # R2s and Beta values, Results + self.R2s = np.zeros((len(container.model_positions), + num_steps, + model_vals, + container.sample_length), dtype=np.float64) + self.betas = np.zeros((len(container.model_positions), + model_vals, + self.model_value.num_bits, + container.sample_length), dtype=np.float64) + self.results = [[] for _ in range(len(container.tiles))] -class LRA(Engine): - def __init__(self, key_bytes=np.arange) -> None: - self.key_bytes = key_bytes - self.samples_len = 0 - self.traces_len = 0 - self.batch_size = 0 - self.batches_num = 0 - - self.average_traces = [] - self.aes_key = [] - - self.samples_range = None - self.samples_start = self.samples_end = 0 - - # S-box definition - self.sbox = np.array([ - 0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76, - 0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0, - 0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15, - 0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75, - 0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84, - 0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF, - 0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8, - 0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2, - 0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73, - 0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB, - 0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79, - 0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08, - 0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A, - 0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E, - 0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF, - 0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16 - ]) - - - def update(self, traces: np.ndarray, plaintext: np.ndarray , average_traces): + def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces): for i in range(traces.shape[0]): average_traces.add_trace(plaintext[i], traces[i]) return average_traces - - def calculate(self, byte_idx, average_traces): - plain, trace = average_traces.get_data() - sst = fixed_sst(trace) - r2, coefs = lra(plain, trace, s_box_out, self.sbox, sst) - r2_peaks = np.max(r2, axis=1) - winning_byte = np.argmax(r2_peaks) - - print(f"Key Byte {byte_idx}: {winning_byte:02x}, Max R2: {np.max(r2_peaks):.5f}") - return winning_byte + def calculate(self, average_traces, model): + plain, traces = average_traces.get_data() + num_traces, trace_length = traces.shape + model_vals = model.shape[0] + + SST = np.sum(np.square(traces), axis=0) - np.square(np.sum(traces, axis=0))/num_traces + SSR = np.empty((model_vals, trace_length)) + + # Linear regression analysis (LRA) for each model position + P = np.linalg.pinv(model) + betas = P @ traces + # Below loop is equivalent to: + # E = model @ beta + # SSR = np.sum((E - traces)**2, axis=1) + # However this takes too much memory + for i in range(0, betas.shape[-1], step := 1): + E = model @ betas[..., i:i+step] + SSR[..., i:i+step] = np.sum((E - traces[:, i:i+step])**2, axis=1) + + R2s = 1 - SSR / SST[None, :] + if self.normalize: # Normalization + R2s = (R2s - np.mean(R2s, axis=0, keepdims=True)) / np.std(R2s, axis=0, keepdims=True) + return R2s, betas - def run(self, container, samples_range=None): - if samples_range == None: - self.samples_range = container.data.sample_length - self.samples_start = 0 - self.samples_end = container.data.sample_length - else: - self.samples_range = samples_range[1]-samples_range[0] - (self.samples_start, self.samples_end) = samples_range + def find_candidate(self, R2s): + r2_peaks = np.max(R2s, axis=1) + winning_candidate = np.argmax(r2_peaks) + return winning_candidate - self.average_traces = [AverageTraces(256, self.samples_range) for _ in range(len(container.model_positions))] # all key bytes - self.aes_key = [[] for _ in range(len(container.tiles))] + def run(self, container): + self.populate(container) with Pool(processes=int(os.cpu_count()/2)) as pool: workload = [] for tile in container.tiles: (tile_x, tile_y) = tile for model_pos in container.model_positions: - workload.append((self, container, self.average_traces[model_pos], tile_x, tile_y, model_pos)) + workload.append((self, container, tile_x, tile_y, model_pos)) starmap_results = pool.starmap(self.run_workload, workload, chunksize=1) pool.close() pool.join() - for tile_x, tile_y, model_pos, tmp_key_byte in starmap_results: + for tile_x, tile_y, model_pos, candidate, r2s, betas in starmap_results: + self.R2s[model_pos] = r2s + self.betas[model_pos] = betas tile_index = list(container.tiles).index((tile_x, tile_y)) - self.aes_key[tile_index].append(tmp_key_byte) - - # Print recovered AES key(s) - for key in self.aes_key: - aes_key_bytes = bytes(key) - print("Recovered AES Key:", aes_key_bytes.hex()) - + self.results[tile_index].append(candidate) @staticmethod - def run_workload(self, container, average_traces, tile_x, tile_y, model_pos): - container.configure(tile_x, tile_y, [model_pos]) + def run_workload(self, container, tile_x, tile_y, model_pos): + num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) + if self.convergence_step is None: + self.convergence_step = np.inf - for batch in container.get_batches(tile_x, tile_y): - average_traces = self.update(batch[-1][:,self.samples_start:self.samples_end], batch[0], average_traces) + model_vals = self.model_value.model.num_vals + average_traces = AverageTraces(model_vals, container.sample_length) - key_byte = self.calculate(model_pos, average_traces) - return tile_x, tile_y, model_pos, key_byte \ No newline at end of file + r2s = np.empty((num_steps, model_vals, container.sample_length)) + + model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] + hypotheses = self.model_value.calculate_table([model_input, None, None, None]) + + traces_processed = 0 + converge_index = 0 + for batch in container.get_batches(tile_x, tile_y): + if traces_processed >= self.convergence_step: + instant_r2s, _ = self.calculate(average_traces, hypotheses) + r2s[converge_index, :, :] = instant_r2s + traces_processed = 0 + converge_index += 1 + # Update + plaintext = batch[0] + traces = batch[-1] + average_traces = self.update(traces, plaintext, average_traces) + traces_processed += traces.shape[0] + + instant_r2s, betas = self.calculate(average_traces, hypotheses) + r2s[converge_index, :, :] = instant_r2s + candidate = self.find_candidate(r2s[-1, ...]) + + return tile_x, tile_y, model_pos, candidate, r2s, betas diff --git a/src/scarr/engines/lra_i.py b/src/scarr/engines/lra_i.py index d664595..8523d9c 100644 --- a/src/scarr/engines/lra_i.py +++ b/src/scarr/engines/lra_i.py @@ -5,120 +5,48 @@ # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. + from .engine import Engine +from ..model_values.model_value import ModelValue +from ..model_values.model_bits import ModelBits import numpy as np from multiprocessing.pool import Pool import os -def model_single_bits(x, bit_width): - model = [] - for i in range(0, bit_width): - bit = (x >> i) & 1 - model.append(bit) - model.append(1) - return model - - -def model_mean(byte_idx, model, group_n): - return np.average(model, axis=0, weights=group_n[:]) - - -def model_cov(byte_idx, model, mod_mean, group_n): - return np.cov(model - mod_mean[None,:], rowvar=False, - fweights=group_n[:]) - - -def covar_with_model(byte_idx, model, mod_mean, grouped_mean, ungrouped_mean, n_traces, group_n): - signal_diff = (grouped_mean[:,:] - - ungrouped_mean[None,:]) - model_diff = model - mod_mean[None,:] - return np.tensordot(model_diff * group_n[:,None], signal_diff, - axes=(0, 0)) / (n_traces - 1) - - -def ungrouped_stats(grouped_mean, grouped_var, group_n): - gm0 = grouped_mean - gn0 = group_n - mu = np.average(gm0, axis=0, weights=gn0) - var = (np.sum((gm0 - mu[None, :]) ** 2 * gn0[:, None], axis=0) + - np.sum(grouped_var[0] * (gn0 - 1)[:, None], axis=0)) - var /= max(np.sum(gn0) - 1, 1) - return mu, var - - -def lra(byte_idx, model_input_sbox, ungrouped_var, group_n, grouped_mean, ungrouped_mean, n_traces, n_points): - model_params = np.zeros((256, 16, n_points)) - total_rsq = np.zeros(256) - - for key_byte in range(256): - model_input_ptxt = model_input_sbox[np.arange(256) ^ key_byte] - mod_mean = model_mean(byte_idx, model_input_ptxt, group_n) - mod_cov = model_cov(byte_idx, model_input_ptxt, mod_mean, group_n) - mod_invcov = np.linalg.inv(mod_cov) - - cov = covar_with_model(byte_idx, model_input_ptxt, mod_mean, grouped_mean, ungrouped_mean, n_traces, group_n) - - model_params[key_byte,:,:] = np.matmul(mod_invcov, cov); - - # Total coefficient of determination, over all sample points. - total_rsq[key_byte] = ( - np.tensordot(cov / ungrouped_var[None,:], - model_params[key_byte,:,:], - axes=((0, 1), (0, 1)))) - - key_byte = np.argmax(total_rsq) - real_params = model_params[key_byte,:,:] - return key_byte, total_rsq, real_params - - class LRA(Engine): - def __init__(self): - self.key_bytes = np.arange(16) - self.aes_key = [] - self.samples_range = None - self.samples_start = self.samples_end = 0 - - # S-box definition - self.sbox = np.array([ - 0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76, - 0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0, - 0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15, - 0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75, - 0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84, - 0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF, - 0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8, - 0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2, - 0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73, - 0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB, - 0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79, - 0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08, - 0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A, - 0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E, - 0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF, - 0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16 - ]) - - self.model_input = np.zeros((256, 16), dtype=np.float64) - for i in range(256): - self.model_input[i, :8] = [(i >> j) & 1 for j in range(8)] - self.model_input[i, 8:] = [(self.sbox[i] >> j) & 1 for j in range(8)] - - self.group_n = None - self.group_mean = None - self.group_M = None - - # Load traces and plaintext data - def load_data(self, container, i, model_pos): - batch = container.get_batch_index(i) - if not batch: - return None - trange = batch[-1][:, 1000:2000].astype(np.float64) - plaintext = batch[0] - return trange, plaintext - - - # Load data for batches + def __init__(self, model_value: ModelValue, convergence_step=None, bias=True) -> None: + self.convergence_step = convergence_step + self.R2s = None + self.betas = None + self.results = [] + + super().__init__(ModelBits(model_value, bias)) + + def get_R2s(self): + return self.R2s + + def get_betas(self): + return self.betas + + def get_results(self): + return self.results + + def populate(self, container): + model_vals = self.model_value.model.num_vals + num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 + + # R2s and Beta values, Results + self.R2s = np.zeros((len(container.model_positions), + num_steps, + model_vals, + container.sample_length), dtype=np.float64) + self.betas = np.zeros((len(container.model_positions), + model_vals, + self.model_value.num_bits, + container.sample_length), dtype=np.float64) + self.results = [[] for _ in range(len(container.tiles))] + def update(self, traces: np.ndarray, plaintext: np.ndarray, group_n, group_mean, group_M): num_traces, num_samples = traces.shape @@ -134,33 +62,37 @@ def update(self, traces: np.ndarray, plaintext: np.ndarray, group_n, group_mean, return group_n, group_mean, group_M - - def calculate(self, byte_idx, group_n, group_mean, group_M): - group_var = np.nan_to_num( - group_M / np.clip(group_n[:, None] - 1, 1, None)) - - un_mu, un_var = ungrouped_stats(group_mean, group_var, group_n) - n_traces = np.sum(group_n) - n_points = group_var.shape[1] - - k, r2, _ = lra(byte_idx, self.model_input, un_var, - group_n, group_mean, un_mu, - n_traces, n_points) - print("Key Byte {:2d}: {:02x}, Max R2: {:.5f}".format( - byte_idx, k, np.max(r2))) - return k - - - def run(self, container, samples_range=None): - if samples_range == None: - self.samples_range = container.data.sample_length - self.samples_start = 0 - self.samples_end = container.data.sample_length - else: - self.samples_range = samples_range[1]-samples_range[0] - (self.samples_start, self.samples_end) = samples_range - - self.aes_key = [[] for _ in range(len(container.tiles))] + def calculate(self, grouped_n, grouped_mean, grouped_sumsqdiff, model): + # Using sums of sq. differences instead of co/var eliminates divisions that would cancel out in the end + def _ungrouped_stats(grouped_mean, grouped_sumsqdiff, group_n): + ungrouped_mean = np.average(grouped_mean, axis=0, weights=group_n) + between = np.sum((grouped_mean - ungrouped_mean) ** 2 * group_n[..., None], axis=0) + within = np.sum(grouped_sumsqdiff, axis=0) + ungrouped_sumsqdiff = between + within + return ungrouped_mean, ungrouped_sumsqdiff + + num_traces, trace_length = np.sum(grouped_n), grouped_sumsqdiff.shape[1] + ungrouped_mean, ungrouped_sumsqdiff = _ungrouped_stats(grouped_mean, grouped_sumsqdiff, grouped_n) + + mod_mean = np.average(model, axis=1, weights=grouped_n) + model_diff = model - mod_mean + model_sumsqdiff = ((model_diff.swapaxes(-1, -2) * grouped_n) @ model_diff) + mod_invcov = np.linalg.inv(model_sumsqdiff) + signal_diff = grouped_mean - ungrouped_mean + combined_sumsqdiff = ((model_diff.swapaxes(-1, -2) * grouped_n) @ signal_diff) + + betas = np.matmul(mod_invcov, combined_sumsqdiff) + r2s = np.sum(np.divide((combined_sumsqdiff * betas), ungrouped_sumsqdiff), axis=1) + + return r2s, betas + + def find_candidate(self, R2s): + aggregated = np.sum(R2s, axis=1) + winning_candidate = np.argmax(aggregated) + return winning_candidate + + def run(self, container): + self.populate(container) with Pool(processes=int(os.cpu_count()/2)) as pool: workload = [] @@ -172,25 +104,44 @@ def run(self, container, samples_range=None): pool.close() pool.join() - for tile_x, tile_y, model_pos, tmp_key_byte in starmap_results: + for tile_x, tile_y, model_pos, tmp_key_byte, tmp_r2s, tmp_betas in starmap_results: + self.R2s[model_pos] = tmp_r2s + self.betas[model_pos] = tmp_betas tile_index = list(container.tiles).index((tile_x, tile_y)) - self.aes_key[tile_index].append(tmp_key_byte) - - # Print recovered AES key(s) - for key in self.aes_key: - aes_key_bytes = bytes(key) - print("Recovered AES Key:", aes_key_bytes.hex()) - + self.results[tile_index].append(tmp_key_byte) @staticmethod def run_workload(self, container, tile_x, tile_y, model_pos): - container.configure(tile_x, tile_y, model_pos) - group_n = np.zeros((256), dtype=np.uint32) - group_mean = np.zeros((256, self.samples_range), dtype=np.float64) + num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) + if self.convergence_step is None: + self.convergence_step = np.inf + + model_vals = self.model_value.model.num_vals + group_n = np.zeros((model_vals), dtype=np.uint32) + group_mean = np.zeros((model_vals, container.sample_length), dtype=np.float64) group_M = np.zeros_like(group_mean) - for batch in container.get_batches(tile_x, tile_y): - (group_n, group_mean, group_M) = self.update(batch[-1][:,self.samples_start:self.samples_end], batch[0], group_n, group_mean, group_M) + r2s = np.empty((num_steps, model_vals, container.sample_length)) + + model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] + hypotheses = self.model_value.calculate_table([model_input, None, None, None]) - key_byte = self.calculate(model_pos, group_n, group_mean, group_M) - return tile_x, tile_y, model_pos, key_byte \ No newline at end of file + traces_processed = 0 + converge_index = 0 + for batch in container.get_batches(tile_x, tile_y): + if traces_processed >= self.convergence_step: + instant_r2s, _ = self.calculate(group_n, group_mean, group_M, hypotheses) + r2s[converge_index, :, :] = instant_r2s + traces_processed = 0 + converge_index += 1 + # Update + plaintext = batch[0] + traces = batch[-1] + group_n, group_mean, group_M = self.update(traces, plaintext, group_n, group_mean, group_M) + traces_processed += traces.shape[0] + + instant_r2s, betas = self.calculate(group_n, group_mean, group_M, hypotheses) + r2s[converge_index, :, :] = instant_r2s + candidate = self.find_candidate(r2s[-1, ...]) + + return tile_x, tile_y, model_pos, candidate, r2s, betas diff --git a/src/scarr/engines/lra_wht.py b/src/scarr/engines/lra_wht.py index afbf1be..cce8599 100644 --- a/src/scarr/engines/lra_wht.py +++ b/src/scarr/engines/lra_wht.py @@ -5,7 +5,10 @@ # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. + from .engine import Engine +from ..model_values.model_value import ModelValue +from ..model_values.model_bits import ModelBits import numpy as np from multiprocessing.pool import Pool import os @@ -13,6 +16,7 @@ from scipy.linalg import hadamard from scipy.linalg import solve_triangular + class AverageTraces: def __init__(self, num_values, trace_length): self.avtraces = np.zeros((num_values, trace_length)) @@ -31,171 +35,147 @@ def get_data(self): avdata_snap = np.flatnonzero(self.counters) avtraces_snap = self.avtraces[avdata_snap] return avdata_snap, avtraces_snap - - -# Function to compute S-box output -def s_box_out(data, key_byte, sbox): - s_box_in = data ^ key_byte - return sbox[s_box_in] - - -def WHT(x): - n = x.shape[0] - H = np.array(hadamard(n)) - return H @ x / n - - -def iWHT(x): - n = x.shape[0] - H = np.array(hadamard(n)) - return H @ x - - -# Linear regression analysis (LRA) for each byte of the key -def lra(data, traces, sbox, sst): - num_traces, trace_length = traces.shape - R2 = np.empty((256, trace_length)) - - intermediate_var = s_box_out(data, 0, sbox) - M0 = np.array(list(map(wrapper(8), intermediate_var))) - - Tm = np.linalg.cholesky(M0.T @ M0) - U0 = solve_triangular(Tm.T, M0.T, lower=True) - - U0_wht = np.zeros_like(U0) - U0_iwht = np.zeros_like(U0_wht) - for p in range(U0.shape[0]): - U0_wht[p,:] = WHT(U0[p,:]) - - for i in range(trace_length): - WL = WHT(traces[:, i]) - for p in range(U0_wht.shape[0]-1): - U0_iwht[p,:] = iWHT(U0_wht[p,:] * WL[:]) - for key_byte in range(256): - SSR = np.sum((U0_iwht[:-1,key_byte])**2) - R2[key_byte,i] = SSR / sst[i] - - return R2 - - -def model_single_bits(x, bit_width): - model = [] - for i in range(0, bit_width): - bit = (x >> i) & 1 - model.append(bit) - model.append(1) - return model - - -def wrapper(y): - def curry(x): - return model_single_bits(x, y) - return curry class LRA(Engine): - def __init__(self, key_bytes=np.arange): - self.key_bytes = key_bytes - self.samples_len = self.traces_len = 0 - self.batch_size = self.batches_num = 0 - self.average_traces = [] - self.aes_key = [] - self.samples_range = None - - # S-box definition - self.sbox = np.array([ - 0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76, - 0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0, - 0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15, - 0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75, - 0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84, - 0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF, - 0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8, - 0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2, - 0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73, - 0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB, - 0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79, - 0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08, - 0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A, - 0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E, - 0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF, - 0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16 - ]) - - - def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces, n_tr, v, u): + def __init__(self, model_value: ModelValue, convergence_step=None, bias=True) -> None: + self.convergence_step = convergence_step + self.R2s = None + self.betas = None + self.results = [] + + super().__init__(ModelBits(model_value, bias)) + + def get_R2s(self): + return self.R2s + + def get_betas(self): + return self.betas + + def get_results(self): + return self.results + + def populate(self, container): + model_vals = self.model_value.model.num_vals + num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 + + # R2s and Beta values, Results + self.R2s = np.zeros((len(container.model_positions), + num_steps, + model_vals, + container.sample_length), dtype=np.float64) + self.betas = np.zeros((len(container.model_positions), + model_vals, + self.model_value.num_bits, + container.sample_length), dtype=np.float64) + self.results = [[] for _ in range(len(container.tiles))] + + def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces, n_traces, sumsq, sums): plaintext = plaintext.reshape(-1) for i in range(traces.shape[0]): t = traces[i] g = int(plaintext[i]) average_traces.add_trace(g, t) - u += t - v += t * t - n_tr += 1 - return average_traces, n_tr, v, u + sums += np.sum(traces, axis=0) + sumsq += np.sum(traces**2, axis=0) + n_traces += traces.shape[0] + return average_traces, n_traces, sumsq, sums - def calculate(self, byte_idx, average_traces, n_tr, v, u): - plain, trace = average_traces.get_data() - sst = v - (u ** 2) / n_tr - r2 = lra(plain, trace, self.sbox, sst) - r2_peaks = np.max(r2, axis=1) - winning_byte = int(np.argmax(r2_peaks)) + def calculate(self, average_traces, n_traces, sumsq, sums, model): + # Walsh-Hadamard Transforms + def _WHT(x): + n = x.shape[-2] + H = np.array(hadamard(n)) + return H @ x / n + def _iWHT(x): + n = x.shape[-2] + H = np.array(hadamard(n)) + return H @ x - print(f"Key Byte {byte_idx}: {winning_byte:02x}") - return winning_byte + plain, traces = average_traces.get_data() + num_traces, trace_length = traces.shape + model_vals = model.shape[0] + SST = sumsq - (sums ** 2) / num_traces + SSR = np.empty((model_vals, trace_length)) - def finalize(self): - pass - - - def run(self, container, samples_range=None): - if samples_range == None: - self.samples_range = container.data.sample_length - self.samples_start = 0 - self.samples_end = container.data.sample_length - else: - self.samples_range = samples_range[1]-samples_range[0] - (self.samples_start, self.samples_end) = samples_range - - self.average_traces = [AverageTraces(256, self.samples_range) for _ in container.model_positions] # all key bytes - self.aes_key = [[] for _ in range(len(container.tiles))] + R2s = np.empty((model_vals, trace_length)) + + M0 = model[0].astype(np.uint32) + Tm = np.linalg.cholesky(M0.T @ M0) + U0 = solve_triangular(Tm.T, M0.T, lower=True) + + U0_wht = _WHT(U0.T) + + WL = _WHT(traces) + U0_iwht = _iWHT(WL.T[..., None] * U0_wht) + + betas = U0_iwht.transpose(1, 2, 0) + SSR = np.sum(betas**2, axis=1) + R2s = SSR / SST + + return R2s, betas + + def find_candidate(self, R2s): + r2_peaks = np.max(R2s, axis=1) + winning_candidate = int(np.argmax(r2_peaks)) + return winning_candidate + + def run(self, container): + self.populate(container) with Pool(processes=int(os.cpu_count()/2)) as pool: workload = [] for tile in container.tiles: (tile_x, tile_y) = tile for model_pos in container.model_positions: - workload.append((self, container, self.average_traces[model_pos], tile_x, tile_y, model_pos)) + workload.append((self, container, tile_x, tile_y, model_pos)) starmap_results = pool.starmap(self.run_workload, workload, chunksize=1) pool.close() pool.join() - for tile_x, tile_y, model_pos, tmp_key_byte in starmap_results: + for tile_x, tile_y, model_pos, candidate, r2s, betas in starmap_results: + self.R2s[model_pos] = r2s + self.betas[model_pos] = betas tile_index = list(container.tiles).index((tile_x, tile_y)) - self.aes_key[tile_index].append(tmp_key_byte) - - # Print recovered AES key(s) - for key in self.aes_key: - aes_key_bytes = bytes(key) - print("Recovered AES Key:", aes_key_bytes.hex()) - + self.results[tile_index].append(candidate) @staticmethod - def run_workload(self, container, average_traces, tile_x, tile_y, model_pos): - container.configure(tile_x, tile_y, [model_pos]) - v = np.zeros(self.samples_range) - u = np.zeros(self.samples_range) - n_tr = 0 - + def run_workload(self, container, tile_x, tile_y, model_pos): + num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) + if self.convergence_step is None: + self.convergence_step = np.inf + + model_vals = self.model_value.model.num_vals + average_traces = AverageTraces(model_vals, container.sample_length) + sumsq = np.zeros(container.sample_length) + sums = np.zeros(container.sample_length) + n_traces = 0 + + r2s = np.empty((num_steps, model_vals, container.sample_length)) + + model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] + hypotheses = self.model_value.calculate_table([model_input, None, None, None]) + + traces_processed = 0 + converge_index = 0 for batch in container.get_batches(tile_x,tile_y): - (average_traces, tmp_n_tr, tmp_v, tmp_u) = self.update(batch[-1][:,self.samples_start:self.samples_end], batch[0], average_traces, n_tr, v, u) - v += tmp_v - u += tmp_u - n_tr += tmp_n_tr - - key_byte = self.calculate(model_pos, average_traces, n_tr, v, u) - return tile_x, tile_y, model_pos, key_byte \ No newline at end of file + if traces_processed >= self.convergence_step: + instant_r2s, _ = self.calculate(average_traces, n_traces, sumsq, sums, hypotheses) + r2s[converge_index, :, :] = instant_r2s + traces_processed = 0 + converge_index += 1 + # Update + plaintext = batch[0] + traces = batch[-1] + average_traces, n_traces, sumsq, sums = self.update(traces, plaintext, average_traces, n_traces, sumsq, sums) + traces_processed += traces.shape[0] + + instant_r2s, betas = self.calculate(average_traces, n_traces, sumsq, sums, hypotheses) + r2s[converge_index, :, :] = instant_r2s + candidate = self.find_candidate(r2s[-1, ...]) + + return tile_x, tile_y, model_pos, candidate, r2s, betas diff --git a/src/scarr/model_values/model_bits.py b/src/scarr/model_values/model_bits.py new file mode 100644 index 0000000..50cb22c --- /dev/null +++ b/src/scarr/model_values/model_bits.py @@ -0,0 +1,53 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +import numpy as np +from .guess_model_value import GuessModelValue + + +class ModelBits(GuessModelValue): + + def __init__(self, model, bias=True) -> None: + self.num_vals = 2 + self.vals = np.arange(2) + self.num_bits = (model.num_vals-1).bit_length() * len(model) + (1 if bias else 0) + self.model = model + self.bias = bias + + def calculate(self, batch): + outputs = self.model.calculate(batch) + + unpacked = np.unpackbits(outputs[..., np.newaxis], axis=-1, bitorder='little') + + if self.bias: + bias_bits = np.ones(unpacked.shape[:-1]+(1,), dtype='uint8') + unpacked = np.concatenate((unpacked, bias_bits), axis=-1) + + return unpacked + + def calculate_table(self, batch): + outputs = self.model.calculate_table(batch) + + unpacked = np.unpackbits(np.atleast_3d(outputs), axis=-1, bitorder='little') + + if self.bias: + bias_bits = np.ones(unpacked.shape[:-1]+(1,), dtype='uint8') + unpacked = np.concatenate((unpacked, bias_bits), axis=-1) + + return unpacked + + def calculate_all_tables(self, batch): + outputs = self.model.calculate_all_tables(batch) + + unpacked = np.unpackbits(outputs, axis=-2, bitorder='little') + if self.bias: + bias_shape = unpacked.shape[:-2] + (1,) + unpacked.shape[-1:] + bias_bits = np.ones(bias_shape, dtype=unpacked.dtype) + unpacked = np.concatenate((unpacked, bias_bits), axis=-2) + + return unpacked diff --git a/src/scarr/model_values/model_value.py b/src/scarr/model_values/model_value.py index 49b8fbe..2d7f863 100644 --- a/src/scarr/model_values/model_value.py +++ b/src/scarr/model_values/model_value.py @@ -13,5 +13,8 @@ class ModelValue: def __init__(self) -> None: pass + def __len__(self): + return 1 + def calculate(self, batch): pass diff --git a/src/scarr/model_values/multi_model.py b/src/scarr/model_values/multi_model.py new file mode 100644 index 0000000..ece78de --- /dev/null +++ b/src/scarr/model_values/multi_model.py @@ -0,0 +1,47 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +import numpy as np +from .model_value import ModelValue +from .guess_model_value import GuessModelValue + + +class MultiModel(GuessModelValue): + + def __init__(self, models) -> None: + self.models = models + # Expects that all models produce same val range + self.num_vals = models[0].num_vals + self.vals = models[0].vals + + def __len__(self): + return len(self.models) + + def calculate(self, batch): + all_models_output = self.models[0].calculate(batch)[np.newaxis, ...] + for m in self.models[1:]: + new_output = m.calculate(batch)[np.newaxis, ...] + all_models_output = np.concatenate((all_models_output), axis=0) + + return all_models_output + + def calculate_table(self, batch): + all_models_output = self.models[0].calculate_table(batch)[..., np.newaxis] + for m in self.models[1:]: + new_output = m.calculate_table(batch)[..., np.newaxis] + all_models_output = np.concatenate((all_models_output, new_output), axis=-1) + + return all_models_output + + def calculate_all_tables(self, batch): + all_models_output = self.models[0].calculate_all_tables(batch)[..., np.newaxis] + for m in self.models[1:]: + new_output = m.calculate_all_tables(batch)[..., np.newaxis] + all_models_output = np.concatenate((all_models_output, new_output), axis=-1) + + return all_models_output.swapaxes(-1, -2) diff --git a/src/scarr/model_values/sbox.py b/src/scarr/model_values/sbox.py new file mode 100644 index 0000000..a8338d3 --- /dev/null +++ b/src/scarr/model_values/sbox.py @@ -0,0 +1,36 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +import numpy as np +from .guess_model_value import GuessModelValue +from .utils import AES_SBOX, KEYS + + +class Sbox(GuessModelValue): + + def __init__(self) -> None: + self.num_vals = 256 + self.vals = np.arange(256) + + def calculate(self, batch): + inputs = np.bitwise_xor(np.squeeze(batch[0]), np.squeeze(batch[1]), dtype=np.uint8) + + return AES_SBOX[inputs] + + def calculate_table(self, batch): + inputs = np.bitwise_xor(np.squeeze(batch[0]), KEYS, dtype=np.uint8) + + return AES_SBOX[inputs] + + def calculate_all_tables(self, batch): + return np.apply_along_axis(self.calculate_all_tables_helper, axis=1, arr=batch[0]) + + def calculate_all_tables_helper(self, data): + inputs = np.bitwise_xor(data, KEYS, dtype=np.uint8) + + return AES_SBOX[inputs] diff --git a/src/scarr/model_values/utils.py b/src/scarr/model_values/utils.py index 215c00a..cc02bf1 100644 --- a/src/scarr/model_values/utils.py +++ b/src/scarr/model_values/utils.py @@ -8,7 +8,7 @@ import numpy as np # The key hypotheseses which are the hex values 0-255 -KEYS = np.arange(256).reshape(-1, 1) +KEYS = np.arange(256, dtype='uint8').reshape(-1, 1) # AES-128 sbox used to compute model values AES_SBOX = np.array([99,124,119,123,242,107,111,197,48,1,103,43,254,215,171,118, @@ -26,7 +26,8 @@ 186,120,37,46,28,166,180,198,232,221,116,31,75,189,139,138, 112,62,181,102,72,3,246,14,97,53,87,185,134,193,29,158, 225,248,152,17,105,217,142,148,155,30,135,233,206,85,40,223, - 140,161,137,13,191,230,66,104,65,153,45,15,176,84,187,22]) + 140,161,137,13,191,230,66,104,65,153,45,15,176,84,187,22], + dtype='uint8') # Hamming weights of the values 0-255 used for model values WEIGHTS = np.array([0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4, From 89d098bae6d2c59bdcd741981d6102f9f7b3df75 Mon Sep 17 00:00:00 2001 From: Casey Colley Date: Tue, 21 Oct 2025 18:26:46 -0700 Subject: [PATCH 2/3] CRLF to LF on lra.py --- src/scarr/engines/lra.py | 320 +++++++++++++++++++-------------------- 1 file changed, 160 insertions(+), 160 deletions(-) diff --git a/src/scarr/engines/lra.py b/src/scarr/engines/lra.py index 6da2236..2c04737 100644 --- a/src/scarr/engines/lra.py +++ b/src/scarr/engines/lra.py @@ -1,160 +1,160 @@ -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at https://mozilla.org/MPL/2.0/. -# -# This Source Code Form is "Incompatible With Secondary Licenses", as -# defined by the Mozilla Public License, v. 2.0. - - -from .engine import Engine -from ..model_values.model_value import ModelValue -from ..model_values.model_bits import ModelBits -from multiprocessing.pool import Pool -import numpy as np -import os - - -# Class to compute average traces -class AverageTraces: - def __init__(self, num_values, trace_length): - self.avtraces = np.zeros((num_values, trace_length)) - self.counters = np.zeros(num_values) - - # Method to add a trace and update the average - def add_trace(self, data, trace): - if self.counters[data] == 0: - self.avtraces[data] = trace - else: - self.avtraces[data] = self.avtraces[data] + (trace - self.avtraces[data]) / self.counters[data] - self.counters[data] += 1 - - # Method to get data with non-zero counters and corresponding average traces - def get_data(self): - avdata_snap = np.flatnonzero(self.counters) - avtraces_snap = self.avtraces[avdata_snap] - return avdata_snap, avtraces_snap - - -class LRA(Engine): - def __init__(self, model_value: ModelValue, convergence_step=None, normalize=False, bias=True) -> None: - self.normalize = normalize - - self.convergence_step = convergence_step - self.R2s = None - self.betas = None - self.results = [] - - super().__init__(ModelBits(model_value, bias)) - - def get_R2s(self): - return self.R2s - - def get_betas(self): - return self.betas - - def get_results(self): - return self.results - - def populate(self, container): - model_vals = self.model_value.model.num_vals - num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 - - # R2s and Beta values, Results - self.R2s = np.zeros((len(container.model_positions), - num_steps, - model_vals, - container.sample_length), dtype=np.float64) - self.betas = np.zeros((len(container.model_positions), - model_vals, - self.model_value.num_bits, - container.sample_length), dtype=np.float64) - self.results = [[] for _ in range(len(container.tiles))] - - def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces): - for i in range(traces.shape[0]): - average_traces.add_trace(plaintext[i], traces[i]) - - return average_traces - - def calculate(self, average_traces, model): - plain, traces = average_traces.get_data() - num_traces, trace_length = traces.shape - model_vals = model.shape[0] - - SST = np.sum(np.square(traces), axis=0) - np.square(np.sum(traces, axis=0))/num_traces - SSR = np.empty((model_vals, trace_length)) - - # Linear regression analysis (LRA) for each model position - P = np.linalg.pinv(model) - betas = P @ traces - # Below loop is equivalent to: - # E = model @ beta - # SSR = np.sum((E - traces)**2, axis=1) - # However this takes too much memory - for i in range(0, betas.shape[-1], step := 1): - E = model @ betas[..., i:i+step] - SSR[..., i:i+step] = np.sum((E - traces[:, i:i+step])**2, axis=1) - - R2s = 1 - SSR / SST[None, :] - if self.normalize: # Normalization - R2s = (R2s - np.mean(R2s, axis=0, keepdims=True)) / np.std(R2s, axis=0, keepdims=True) - - return R2s, betas - - def find_candidate(self, R2s): - r2_peaks = np.max(R2s, axis=1) - winning_candidate = np.argmax(r2_peaks) - return winning_candidate - - def run(self, container): - self.populate(container) - - with Pool(processes=int(os.cpu_count()/2)) as pool: - workload = [] - for tile in container.tiles: - (tile_x, tile_y) = tile - for model_pos in container.model_positions: - workload.append((self, container, tile_x, tile_y, model_pos)) - starmap_results = pool.starmap(self.run_workload, workload, chunksize=1) - pool.close() - pool.join() - - for tile_x, tile_y, model_pos, candidate, r2s, betas in starmap_results: - self.R2s[model_pos] = r2s - self.betas[model_pos] = betas - tile_index = list(container.tiles).index((tile_x, tile_y)) - self.results[tile_index].append(candidate) - - @staticmethod - def run_workload(self, container, tile_x, tile_y, model_pos): - num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) - if self.convergence_step is None: - self.convergence_step = np.inf - - model_vals = self.model_value.model.num_vals - average_traces = AverageTraces(model_vals, container.sample_length) - - r2s = np.empty((num_steps, model_vals, container.sample_length)) - - model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] - hypotheses = self.model_value.calculate_table([model_input, None, None, None]) - - traces_processed = 0 - converge_index = 0 - for batch in container.get_batches(tile_x, tile_y): - if traces_processed >= self.convergence_step: - instant_r2s, _ = self.calculate(average_traces, hypotheses) - r2s[converge_index, :, :] = instant_r2s - traces_processed = 0 - converge_index += 1 - # Update - plaintext = batch[0] - traces = batch[-1] - average_traces = self.update(traces, plaintext, average_traces) - traces_processed += traces.shape[0] - - instant_r2s, betas = self.calculate(average_traces, hypotheses) - r2s[converge_index, :, :] = instant_r2s - candidate = self.find_candidate(r2s[-1, ...]) - - return tile_x, tile_y, model_pos, candidate, r2s, betas +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +from .engine import Engine +from ..model_values.model_value import ModelValue +from ..model_values.model_bits import ModelBits +from multiprocessing.pool import Pool +import numpy as np +import os + + +# Class to compute average traces +class AverageTraces: + def __init__(self, num_values, trace_length): + self.avtraces = np.zeros((num_values, trace_length)) + self.counters = np.zeros(num_values) + + # Method to add a trace and update the average + def add_trace(self, data, trace): + if self.counters[data] == 0: + self.avtraces[data] = trace + else: + self.avtraces[data] = self.avtraces[data] + (trace - self.avtraces[data]) / self.counters[data] + self.counters[data] += 1 + + # Method to get data with non-zero counters and corresponding average traces + def get_data(self): + avdata_snap = np.flatnonzero(self.counters) + avtraces_snap = self.avtraces[avdata_snap] + return avdata_snap, avtraces_snap + + +class LRA(Engine): + def __init__(self, model_value: ModelValue, convergence_step=None, normalize=False, bias=True) -> None: + self.normalize = normalize + + self.convergence_step = convergence_step + self.R2s = None + self.betas = None + self.results = [] + + super().__init__(ModelBits(model_value, bias)) + + def get_R2s(self): + return self.R2s + + def get_betas(self): + return self.betas + + def get_results(self): + return self.results + + def populate(self, container): + model_vals = self.model_value.model.num_vals + num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 + + # R2s and Beta values, Results + self.R2s = np.zeros((len(container.model_positions), + num_steps, + model_vals, + container.sample_length), dtype=np.float64) + self.betas = np.zeros((len(container.model_positions), + model_vals, + self.model_value.num_bits, + container.sample_length), dtype=np.float64) + self.results = [[] for _ in range(len(container.tiles))] + + def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces): + for i in range(traces.shape[0]): + average_traces.add_trace(plaintext[i], traces[i]) + + return average_traces + + def calculate(self, average_traces, model): + plain, traces = average_traces.get_data() + num_traces, trace_length = traces.shape + model_vals = model.shape[0] + + SST = np.sum(np.square(traces), axis=0) - np.square(np.sum(traces, axis=0))/num_traces + SSR = np.empty((model_vals, trace_length)) + + # Linear regression analysis (LRA) for each model position + P = np.linalg.pinv(model) + betas = P @ traces + # Below loop is equivalent to: + # E = model @ beta + # SSR = np.sum((E - traces)**2, axis=1) + # However this takes too much memory + for i in range(0, betas.shape[-1], step := 1): + E = model @ betas[..., i:i+step] + SSR[..., i:i+step] = np.sum((E - traces[:, i:i+step])**2, axis=1) + + R2s = 1 - SSR / SST[None, :] + if self.normalize: # Normalization + R2s = (R2s - np.mean(R2s, axis=0, keepdims=True)) / np.std(R2s, axis=0, keepdims=True) + + return R2s, betas + + def find_candidate(self, R2s): + r2_peaks = np.max(R2s, axis=1) + winning_candidate = np.argmax(r2_peaks) + return winning_candidate + + def run(self, container): + self.populate(container) + + with Pool(processes=int(os.cpu_count()/2)) as pool: + workload = [] + for tile in container.tiles: + (tile_x, tile_y) = tile + for model_pos in container.model_positions: + workload.append((self, container, tile_x, tile_y, model_pos)) + starmap_results = pool.starmap(self.run_workload, workload, chunksize=1) + pool.close() + pool.join() + + for tile_x, tile_y, model_pos, candidate, r2s, betas in starmap_results: + self.R2s[model_pos] = r2s + self.betas[model_pos] = betas + tile_index = list(container.tiles).index((tile_x, tile_y)) + self.results[tile_index].append(candidate) + + @staticmethod + def run_workload(self, container, tile_x, tile_y, model_pos): + num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) + if self.convergence_step is None: + self.convergence_step = np.inf + + model_vals = self.model_value.model.num_vals + average_traces = AverageTraces(model_vals, container.sample_length) + + r2s = np.empty((num_steps, model_vals, container.sample_length)) + + model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] + hypotheses = self.model_value.calculate_table([model_input, None, None, None]) + + traces_processed = 0 + converge_index = 0 + for batch in container.get_batches(tile_x, tile_y): + if traces_processed >= self.convergence_step: + instant_r2s, _ = self.calculate(average_traces, hypotheses) + r2s[converge_index, :, :] = instant_r2s + traces_processed = 0 + converge_index += 1 + # Update + plaintext = batch[0] + traces = batch[-1] + average_traces = self.update(traces, plaintext, average_traces) + traces_processed += traces.shape[0] + + instant_r2s, betas = self.calculate(average_traces, hypotheses) + r2s[converge_index, :, :] = instant_r2s + candidate = self.find_candidate(r2s[-1, ...]) + + return tile_x, tile_y, model_pos, candidate, r2s, betas From 3dc99ffd0e0d8194b5d3cbb46e3ae5c21418a0cf Mon Sep 17 00:00:00 2001 From: Casey Colley Date: Tue, 21 Oct 2025 18:37:38 -0700 Subject: [PATCH 3/3] fixed linter errors --- src/scarr/engines/lra_i.py | 1 - src/scarr/model_values/multi_model.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/scarr/engines/lra_i.py b/src/scarr/engines/lra_i.py index 8523d9c..534d8e6 100644 --- a/src/scarr/engines/lra_i.py +++ b/src/scarr/engines/lra_i.py @@ -71,7 +71,6 @@ def _ungrouped_stats(grouped_mean, grouped_sumsqdiff, group_n): ungrouped_sumsqdiff = between + within return ungrouped_mean, ungrouped_sumsqdiff - num_traces, trace_length = np.sum(grouped_n), grouped_sumsqdiff.shape[1] ungrouped_mean, ungrouped_sumsqdiff = _ungrouped_stats(grouped_mean, grouped_sumsqdiff, grouped_n) mod_mean = np.average(model, axis=1, weights=grouped_n) diff --git a/src/scarr/model_values/multi_model.py b/src/scarr/model_values/multi_model.py index ece78de..511fa86 100644 --- a/src/scarr/model_values/multi_model.py +++ b/src/scarr/model_values/multi_model.py @@ -26,7 +26,7 @@ def calculate(self, batch): all_models_output = self.models[0].calculate(batch)[np.newaxis, ...] for m in self.models[1:]: new_output = m.calculate(batch)[np.newaxis, ...] - all_models_output = np.concatenate((all_models_output), axis=0) + all_models_output = np.concatenate((all_models_output, new_output), axis=-1) return all_models_output