diff --git a/src/scarr/engines/lra.py b/src/scarr/engines/lra.py index 76f6fa0..2c04737 100644 --- a/src/scarr/engines/lra.py +++ b/src/scarr/engines/lra.py @@ -1,193 +1,160 @@ -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at https://mozilla.org/MPL/2.0/. -# -# This Source Code Form is "Incompatible With Secondary Licenses", as -# defined by the Mozilla Public License, v. 2.0. - -from .engine import Engine -from multiprocessing.pool import Pool -import numpy as np -import os - -import matplotlib.pyplot as plt - -# Class to compute average traces -class AverageTraces: - def __init__(self, num_values, trace_length): - self.avtraces = np.zeros((num_values, trace_length)) - self.counters = np.zeros(num_values) - - # Method to add a trace and update the average - def add_trace(self, data, trace): - if self.counters[data] == 0: - self.avtraces[data] = trace - else: - self.avtraces[data] = self.avtraces[data] + (trace - self.avtraces[data]) / self.counters[data] - self.counters[data] += 1 - - # Method to get data with non-zero counters and corresponding average traces - def get_data(self): - avdata_snap = np.flatnonzero(self.counters) - avtraces_snap = self.avtraces[avdata_snap] - return avdata_snap, avtraces_snap - - -# Function to compute S-box output -def s_box_out(data, key_byte, sbox): - s_box_in = data ^ key_byte - return sbox[s_box_in] - - -# Linear regression analysis (LRA) for each byte of the key -def lra(data, traces, intermediate_fkt, sbox, sst): - num_traces, trace_length = traces.shape - SSR = np.empty((256, trace_length)) - all_coefs = np.empty((256, 9, trace_length)) - - for key_byte in np.arange(256, dtype='uint8'): - intermediate_var = intermediate_fkt(data, key_byte, sbox) - M = np.array(list(map(wrapper(8), intermediate_var))) - - P = (np.linalg.inv(M.T @ M)) @ M.T - - beta = P @ traces - - E = M @ beta - - SSR[key_byte] = np.sum((E - traces) **2 , axis=0) - - all_coefs[key_byte,:] = beta[:] - - R2 = 1 - SSR / sst[None, :] - return R2, all_coefs - - -# Function to generate a model of bits -def model_single_bits(x, bit_width): - model = [] - for i in range(0, bit_width): - bit = (x >> i) & 1 - model.append(bit) - model.append(1) - return model - - -# Wrapper function to map a model to an intermediate variable -def wrapper(y): - def curry(x): - return model_single_bits(x, y) - return curry - - -# Function to plot traces -def plot_traces(traces, nb): - for i in range(nb): - plt.plot(traces[i]) - plt.show() - - -def fixed_sst(traces): - num_traces, trace_length = traces.shape - u = np.zeros(trace_length) - v = np.zeros(trace_length) - for i in range(num_traces): - u += traces[i] - v += traces[i]**2 - return np.subtract(v,np.square(u)/num_traces) - - -class LRA(Engine): - def __init__(self, key_bytes=np.arange) -> None: - self.key_bytes = key_bytes - self.samples_len = 0 - self.traces_len = 0 - self.batch_size = 0 - self.batches_num = 0 - - self.average_traces = [] - self.aes_key = [] - - self.samples_range = None - self.samples_start = self.samples_end = 0 - - # S-box definition - self.sbox = np.array([ - 0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76, - 0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0, - 0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15, - 0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75, - 0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84, - 0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF, - 0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8, - 0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2, - 0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73, - 0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB, - 0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79, - 0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08, - 0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A, - 0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E, - 0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF, - 0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16 - ]) - - - def update(self, traces: np.ndarray, plaintext: np.ndarray , average_traces): - for i in range(traces.shape[0]): - average_traces.add_trace(plaintext[i], traces[i]) - - return average_traces - - - def calculate(self, byte_idx, average_traces): - plain, trace = average_traces.get_data() - sst = fixed_sst(trace) - r2, coefs = lra(plain, trace, s_box_out, self.sbox, sst) - r2_peaks = np.max(r2, axis=1) - winning_byte = np.argmax(r2_peaks) - - print(f"Key Byte {byte_idx}: {winning_byte:02x}, Max R2: {np.max(r2_peaks):.5f}") - return winning_byte - - - def run(self, container, samples_range=None): - if samples_range == None: - self.samples_range = container.data.sample_length - self.samples_start = 0 - self.samples_end = container.data.sample_length - else: - self.samples_range = samples_range[1]-samples_range[0] - (self.samples_start, self.samples_end) = samples_range - - self.average_traces = [AverageTraces(256, self.samples_range) for _ in range(len(container.model_positions))] # all key bytes - self.aes_key = [[] for _ in range(len(container.tiles))] - - with Pool(processes=int(os.cpu_count()/2)) as pool: - workload = [] - for tile in container.tiles: - (tile_x, tile_y) = tile - for model_pos in container.model_positions: - workload.append((self, container, self.average_traces[model_pos], tile_x, tile_y, model_pos)) - starmap_results = pool.starmap(self.run_workload, workload, chunksize=1) - pool.close() - pool.join() - - for tile_x, tile_y, model_pos, tmp_key_byte in starmap_results: - tile_index = list(container.tiles).index((tile_x, tile_y)) - self.aes_key[tile_index].append(tmp_key_byte) - - # Print recovered AES key(s) - for key in self.aes_key: - aes_key_bytes = bytes(key) - print("Recovered AES Key:", aes_key_bytes.hex()) - - - @staticmethod - def run_workload(self, container, average_traces, tile_x, tile_y, model_pos): - container.configure(tile_x, tile_y, [model_pos]) - - for batch in container.get_batches(tile_x, tile_y): - average_traces = self.update(batch[-1][:,self.samples_start:self.samples_end], batch[0], average_traces) - - key_byte = self.calculate(model_pos, average_traces) - return tile_x, tile_y, model_pos, key_byte \ No newline at end of file +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +from .engine import Engine +from ..model_values.model_value import ModelValue +from ..model_values.model_bits import ModelBits +from multiprocessing.pool import Pool +import numpy as np +import os + + +# Class to compute average traces +class AverageTraces: + def __init__(self, num_values, trace_length): + self.avtraces = np.zeros((num_values, trace_length)) + self.counters = np.zeros(num_values) + + # Method to add a trace and update the average + def add_trace(self, data, trace): + if self.counters[data] == 0: + self.avtraces[data] = trace + else: + self.avtraces[data] = self.avtraces[data] + (trace - self.avtraces[data]) / self.counters[data] + self.counters[data] += 1 + + # Method to get data with non-zero counters and corresponding average traces + def get_data(self): + avdata_snap = np.flatnonzero(self.counters) + avtraces_snap = self.avtraces[avdata_snap] + return avdata_snap, avtraces_snap + + +class LRA(Engine): + def __init__(self, model_value: ModelValue, convergence_step=None, normalize=False, bias=True) -> None: + self.normalize = normalize + + self.convergence_step = convergence_step + self.R2s = None + self.betas = None + self.results = [] + + super().__init__(ModelBits(model_value, bias)) + + def get_R2s(self): + return self.R2s + + def get_betas(self): + return self.betas + + def get_results(self): + return self.results + + def populate(self, container): + model_vals = self.model_value.model.num_vals + num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 + + # R2s and Beta values, Results + self.R2s = np.zeros((len(container.model_positions), + num_steps, + model_vals, + container.sample_length), dtype=np.float64) + self.betas = np.zeros((len(container.model_positions), + model_vals, + self.model_value.num_bits, + container.sample_length), dtype=np.float64) + self.results = [[] for _ in range(len(container.tiles))] + + def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces): + for i in range(traces.shape[0]): + average_traces.add_trace(plaintext[i], traces[i]) + + return average_traces + + def calculate(self, average_traces, model): + plain, traces = average_traces.get_data() + num_traces, trace_length = traces.shape + model_vals = model.shape[0] + + SST = np.sum(np.square(traces), axis=0) - np.square(np.sum(traces, axis=0))/num_traces + SSR = np.empty((model_vals, trace_length)) + + # Linear regression analysis (LRA) for each model position + P = np.linalg.pinv(model) + betas = P @ traces + # Below loop is equivalent to: + # E = model @ beta + # SSR = np.sum((E - traces)**2, axis=1) + # However this takes too much memory + for i in range(0, betas.shape[-1], step := 1): + E = model @ betas[..., i:i+step] + SSR[..., i:i+step] = np.sum((E - traces[:, i:i+step])**2, axis=1) + + R2s = 1 - SSR / SST[None, :] + if self.normalize: # Normalization + R2s = (R2s - np.mean(R2s, axis=0, keepdims=True)) / np.std(R2s, axis=0, keepdims=True) + + return R2s, betas + + def find_candidate(self, R2s): + r2_peaks = np.max(R2s, axis=1) + winning_candidate = np.argmax(r2_peaks) + return winning_candidate + + def run(self, container): + self.populate(container) + + with Pool(processes=int(os.cpu_count()/2)) as pool: + workload = [] + for tile in container.tiles: + (tile_x, tile_y) = tile + for model_pos in container.model_positions: + workload.append((self, container, tile_x, tile_y, model_pos)) + starmap_results = pool.starmap(self.run_workload, workload, chunksize=1) + pool.close() + pool.join() + + for tile_x, tile_y, model_pos, candidate, r2s, betas in starmap_results: + self.R2s[model_pos] = r2s + self.betas[model_pos] = betas + tile_index = list(container.tiles).index((tile_x, tile_y)) + self.results[tile_index].append(candidate) + + @staticmethod + def run_workload(self, container, tile_x, tile_y, model_pos): + num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) + if self.convergence_step is None: + self.convergence_step = np.inf + + model_vals = self.model_value.model.num_vals + average_traces = AverageTraces(model_vals, container.sample_length) + + r2s = np.empty((num_steps, model_vals, container.sample_length)) + + model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] + hypotheses = self.model_value.calculate_table([model_input, None, None, None]) + + traces_processed = 0 + converge_index = 0 + for batch in container.get_batches(tile_x, tile_y): + if traces_processed >= self.convergence_step: + instant_r2s, _ = self.calculate(average_traces, hypotheses) + r2s[converge_index, :, :] = instant_r2s + traces_processed = 0 + converge_index += 1 + # Update + plaintext = batch[0] + traces = batch[-1] + average_traces = self.update(traces, plaintext, average_traces) + traces_processed += traces.shape[0] + + instant_r2s, betas = self.calculate(average_traces, hypotheses) + r2s[converge_index, :, :] = instant_r2s + candidate = self.find_candidate(r2s[-1, ...]) + + return tile_x, tile_y, model_pos, candidate, r2s, betas diff --git a/src/scarr/engines/lra_i.py b/src/scarr/engines/lra_i.py index d664595..534d8e6 100644 --- a/src/scarr/engines/lra_i.py +++ b/src/scarr/engines/lra_i.py @@ -5,120 +5,48 @@ # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. + from .engine import Engine +from ..model_values.model_value import ModelValue +from ..model_values.model_bits import ModelBits import numpy as np from multiprocessing.pool import Pool import os -def model_single_bits(x, bit_width): - model = [] - for i in range(0, bit_width): - bit = (x >> i) & 1 - model.append(bit) - model.append(1) - return model - - -def model_mean(byte_idx, model, group_n): - return np.average(model, axis=0, weights=group_n[:]) - - -def model_cov(byte_idx, model, mod_mean, group_n): - return np.cov(model - mod_mean[None,:], rowvar=False, - fweights=group_n[:]) - - -def covar_with_model(byte_idx, model, mod_mean, grouped_mean, ungrouped_mean, n_traces, group_n): - signal_diff = (grouped_mean[:,:] - - ungrouped_mean[None,:]) - model_diff = model - mod_mean[None,:] - return np.tensordot(model_diff * group_n[:,None], signal_diff, - axes=(0, 0)) / (n_traces - 1) - - -def ungrouped_stats(grouped_mean, grouped_var, group_n): - gm0 = grouped_mean - gn0 = group_n - mu = np.average(gm0, axis=0, weights=gn0) - var = (np.sum((gm0 - mu[None, :]) ** 2 * gn0[:, None], axis=0) + - np.sum(grouped_var[0] * (gn0 - 1)[:, None], axis=0)) - var /= max(np.sum(gn0) - 1, 1) - return mu, var - - -def lra(byte_idx, model_input_sbox, ungrouped_var, group_n, grouped_mean, ungrouped_mean, n_traces, n_points): - model_params = np.zeros((256, 16, n_points)) - total_rsq = np.zeros(256) - - for key_byte in range(256): - model_input_ptxt = model_input_sbox[np.arange(256) ^ key_byte] - mod_mean = model_mean(byte_idx, model_input_ptxt, group_n) - mod_cov = model_cov(byte_idx, model_input_ptxt, mod_mean, group_n) - mod_invcov = np.linalg.inv(mod_cov) - - cov = covar_with_model(byte_idx, model_input_ptxt, mod_mean, grouped_mean, ungrouped_mean, n_traces, group_n) - - model_params[key_byte,:,:] = np.matmul(mod_invcov, cov); - - # Total coefficient of determination, over all sample points. - total_rsq[key_byte] = ( - np.tensordot(cov / ungrouped_var[None,:], - model_params[key_byte,:,:], - axes=((0, 1), (0, 1)))) - - key_byte = np.argmax(total_rsq) - real_params = model_params[key_byte,:,:] - return key_byte, total_rsq, real_params - - class LRA(Engine): - def __init__(self): - self.key_bytes = np.arange(16) - self.aes_key = [] - self.samples_range = None - self.samples_start = self.samples_end = 0 - - # S-box definition - self.sbox = np.array([ - 0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76, - 0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0, - 0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15, - 0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75, - 0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84, - 0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF, - 0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8, - 0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2, - 0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73, - 0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB, - 0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79, - 0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08, - 0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A, - 0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E, - 0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF, - 0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16 - ]) - - self.model_input = np.zeros((256, 16), dtype=np.float64) - for i in range(256): - self.model_input[i, :8] = [(i >> j) & 1 for j in range(8)] - self.model_input[i, 8:] = [(self.sbox[i] >> j) & 1 for j in range(8)] - - self.group_n = None - self.group_mean = None - self.group_M = None - - # Load traces and plaintext data - def load_data(self, container, i, model_pos): - batch = container.get_batch_index(i) - if not batch: - return None - trange = batch[-1][:, 1000:2000].astype(np.float64) - plaintext = batch[0] - return trange, plaintext - - - # Load data for batches + def __init__(self, model_value: ModelValue, convergence_step=None, bias=True) -> None: + self.convergence_step = convergence_step + self.R2s = None + self.betas = None + self.results = [] + + super().__init__(ModelBits(model_value, bias)) + + def get_R2s(self): + return self.R2s + + def get_betas(self): + return self.betas + + def get_results(self): + return self.results + + def populate(self, container): + model_vals = self.model_value.model.num_vals + num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 + + # R2s and Beta values, Results + self.R2s = np.zeros((len(container.model_positions), + num_steps, + model_vals, + container.sample_length), dtype=np.float64) + self.betas = np.zeros((len(container.model_positions), + model_vals, + self.model_value.num_bits, + container.sample_length), dtype=np.float64) + self.results = [[] for _ in range(len(container.tiles))] + def update(self, traces: np.ndarray, plaintext: np.ndarray, group_n, group_mean, group_M): num_traces, num_samples = traces.shape @@ -134,33 +62,36 @@ def update(self, traces: np.ndarray, plaintext: np.ndarray, group_n, group_mean, return group_n, group_mean, group_M - - def calculate(self, byte_idx, group_n, group_mean, group_M): - group_var = np.nan_to_num( - group_M / np.clip(group_n[:, None] - 1, 1, None)) - - un_mu, un_var = ungrouped_stats(group_mean, group_var, group_n) - n_traces = np.sum(group_n) - n_points = group_var.shape[1] - - k, r2, _ = lra(byte_idx, self.model_input, un_var, - group_n, group_mean, un_mu, - n_traces, n_points) - print("Key Byte {:2d}: {:02x}, Max R2: {:.5f}".format( - byte_idx, k, np.max(r2))) - return k - - - def run(self, container, samples_range=None): - if samples_range == None: - self.samples_range = container.data.sample_length - self.samples_start = 0 - self.samples_end = container.data.sample_length - else: - self.samples_range = samples_range[1]-samples_range[0] - (self.samples_start, self.samples_end) = samples_range - - self.aes_key = [[] for _ in range(len(container.tiles))] + def calculate(self, grouped_n, grouped_mean, grouped_sumsqdiff, model): + # Using sums of sq. differences instead of co/var eliminates divisions that would cancel out in the end + def _ungrouped_stats(grouped_mean, grouped_sumsqdiff, group_n): + ungrouped_mean = np.average(grouped_mean, axis=0, weights=group_n) + between = np.sum((grouped_mean - ungrouped_mean) ** 2 * group_n[..., None], axis=0) + within = np.sum(grouped_sumsqdiff, axis=0) + ungrouped_sumsqdiff = between + within + return ungrouped_mean, ungrouped_sumsqdiff + + ungrouped_mean, ungrouped_sumsqdiff = _ungrouped_stats(grouped_mean, grouped_sumsqdiff, grouped_n) + + mod_mean = np.average(model, axis=1, weights=grouped_n) + model_diff = model - mod_mean + model_sumsqdiff = ((model_diff.swapaxes(-1, -2) * grouped_n) @ model_diff) + mod_invcov = np.linalg.inv(model_sumsqdiff) + signal_diff = grouped_mean - ungrouped_mean + combined_sumsqdiff = ((model_diff.swapaxes(-1, -2) * grouped_n) @ signal_diff) + + betas = np.matmul(mod_invcov, combined_sumsqdiff) + r2s = np.sum(np.divide((combined_sumsqdiff * betas), ungrouped_sumsqdiff), axis=1) + + return r2s, betas + + def find_candidate(self, R2s): + aggregated = np.sum(R2s, axis=1) + winning_candidate = np.argmax(aggregated) + return winning_candidate + + def run(self, container): + self.populate(container) with Pool(processes=int(os.cpu_count()/2)) as pool: workload = [] @@ -172,25 +103,44 @@ def run(self, container, samples_range=None): pool.close() pool.join() - for tile_x, tile_y, model_pos, tmp_key_byte in starmap_results: + for tile_x, tile_y, model_pos, tmp_key_byte, tmp_r2s, tmp_betas in starmap_results: + self.R2s[model_pos] = tmp_r2s + self.betas[model_pos] = tmp_betas tile_index = list(container.tiles).index((tile_x, tile_y)) - self.aes_key[tile_index].append(tmp_key_byte) - - # Print recovered AES key(s) - for key in self.aes_key: - aes_key_bytes = bytes(key) - print("Recovered AES Key:", aes_key_bytes.hex()) - + self.results[tile_index].append(tmp_key_byte) @staticmethod def run_workload(self, container, tile_x, tile_y, model_pos): - container.configure(tile_x, tile_y, model_pos) - group_n = np.zeros((256), dtype=np.uint32) - group_mean = np.zeros((256, self.samples_range), dtype=np.float64) + num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) + if self.convergence_step is None: + self.convergence_step = np.inf + + model_vals = self.model_value.model.num_vals + group_n = np.zeros((model_vals), dtype=np.uint32) + group_mean = np.zeros((model_vals, container.sample_length), dtype=np.float64) group_M = np.zeros_like(group_mean) - for batch in container.get_batches(tile_x, tile_y): - (group_n, group_mean, group_M) = self.update(batch[-1][:,self.samples_start:self.samples_end], batch[0], group_n, group_mean, group_M) + r2s = np.empty((num_steps, model_vals, container.sample_length)) + + model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] + hypotheses = self.model_value.calculate_table([model_input, None, None, None]) - key_byte = self.calculate(model_pos, group_n, group_mean, group_M) - return tile_x, tile_y, model_pos, key_byte \ No newline at end of file + traces_processed = 0 + converge_index = 0 + for batch in container.get_batches(tile_x, tile_y): + if traces_processed >= self.convergence_step: + instant_r2s, _ = self.calculate(group_n, group_mean, group_M, hypotheses) + r2s[converge_index, :, :] = instant_r2s + traces_processed = 0 + converge_index += 1 + # Update + plaintext = batch[0] + traces = batch[-1] + group_n, group_mean, group_M = self.update(traces, plaintext, group_n, group_mean, group_M) + traces_processed += traces.shape[0] + + instant_r2s, betas = self.calculate(group_n, group_mean, group_M, hypotheses) + r2s[converge_index, :, :] = instant_r2s + candidate = self.find_candidate(r2s[-1, ...]) + + return tile_x, tile_y, model_pos, candidate, r2s, betas diff --git a/src/scarr/engines/lra_wht.py b/src/scarr/engines/lra_wht.py index afbf1be..cce8599 100644 --- a/src/scarr/engines/lra_wht.py +++ b/src/scarr/engines/lra_wht.py @@ -5,7 +5,10 @@ # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. + from .engine import Engine +from ..model_values.model_value import ModelValue +from ..model_values.model_bits import ModelBits import numpy as np from multiprocessing.pool import Pool import os @@ -13,6 +16,7 @@ from scipy.linalg import hadamard from scipy.linalg import solve_triangular + class AverageTraces: def __init__(self, num_values, trace_length): self.avtraces = np.zeros((num_values, trace_length)) @@ -31,171 +35,147 @@ def get_data(self): avdata_snap = np.flatnonzero(self.counters) avtraces_snap = self.avtraces[avdata_snap] return avdata_snap, avtraces_snap - - -# Function to compute S-box output -def s_box_out(data, key_byte, sbox): - s_box_in = data ^ key_byte - return sbox[s_box_in] - - -def WHT(x): - n = x.shape[0] - H = np.array(hadamard(n)) - return H @ x / n - - -def iWHT(x): - n = x.shape[0] - H = np.array(hadamard(n)) - return H @ x - - -# Linear regression analysis (LRA) for each byte of the key -def lra(data, traces, sbox, sst): - num_traces, trace_length = traces.shape - R2 = np.empty((256, trace_length)) - - intermediate_var = s_box_out(data, 0, sbox) - M0 = np.array(list(map(wrapper(8), intermediate_var))) - - Tm = np.linalg.cholesky(M0.T @ M0) - U0 = solve_triangular(Tm.T, M0.T, lower=True) - - U0_wht = np.zeros_like(U0) - U0_iwht = np.zeros_like(U0_wht) - for p in range(U0.shape[0]): - U0_wht[p,:] = WHT(U0[p,:]) - - for i in range(trace_length): - WL = WHT(traces[:, i]) - for p in range(U0_wht.shape[0]-1): - U0_iwht[p,:] = iWHT(U0_wht[p,:] * WL[:]) - for key_byte in range(256): - SSR = np.sum((U0_iwht[:-1,key_byte])**2) - R2[key_byte,i] = SSR / sst[i] - - return R2 - - -def model_single_bits(x, bit_width): - model = [] - for i in range(0, bit_width): - bit = (x >> i) & 1 - model.append(bit) - model.append(1) - return model - - -def wrapper(y): - def curry(x): - return model_single_bits(x, y) - return curry class LRA(Engine): - def __init__(self, key_bytes=np.arange): - self.key_bytes = key_bytes - self.samples_len = self.traces_len = 0 - self.batch_size = self.batches_num = 0 - self.average_traces = [] - self.aes_key = [] - self.samples_range = None - - # S-box definition - self.sbox = np.array([ - 0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76, - 0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0, - 0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15, - 0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75, - 0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84, - 0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF, - 0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8, - 0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2, - 0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73, - 0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB, - 0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79, - 0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08, - 0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A, - 0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E, - 0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF, - 0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16 - ]) - - - def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces, n_tr, v, u): + def __init__(self, model_value: ModelValue, convergence_step=None, bias=True) -> None: + self.convergence_step = convergence_step + self.R2s = None + self.betas = None + self.results = [] + + super().__init__(ModelBits(model_value, bias)) + + def get_R2s(self): + return self.R2s + + def get_betas(self): + return self.betas + + def get_results(self): + return self.results + + def populate(self, container): + model_vals = self.model_value.model.num_vals + num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1 + + # R2s and Beta values, Results + self.R2s = np.zeros((len(container.model_positions), + num_steps, + model_vals, + container.sample_length), dtype=np.float64) + self.betas = np.zeros((len(container.model_positions), + model_vals, + self.model_value.num_bits, + container.sample_length), dtype=np.float64) + self.results = [[] for _ in range(len(container.tiles))] + + def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces, n_traces, sumsq, sums): plaintext = plaintext.reshape(-1) for i in range(traces.shape[0]): t = traces[i] g = int(plaintext[i]) average_traces.add_trace(g, t) - u += t - v += t * t - n_tr += 1 - return average_traces, n_tr, v, u + sums += np.sum(traces, axis=0) + sumsq += np.sum(traces**2, axis=0) + n_traces += traces.shape[0] + return average_traces, n_traces, sumsq, sums - def calculate(self, byte_idx, average_traces, n_tr, v, u): - plain, trace = average_traces.get_data() - sst = v - (u ** 2) / n_tr - r2 = lra(plain, trace, self.sbox, sst) - r2_peaks = np.max(r2, axis=1) - winning_byte = int(np.argmax(r2_peaks)) + def calculate(self, average_traces, n_traces, sumsq, sums, model): + # Walsh-Hadamard Transforms + def _WHT(x): + n = x.shape[-2] + H = np.array(hadamard(n)) + return H @ x / n + def _iWHT(x): + n = x.shape[-2] + H = np.array(hadamard(n)) + return H @ x - print(f"Key Byte {byte_idx}: {winning_byte:02x}") - return winning_byte + plain, traces = average_traces.get_data() + num_traces, trace_length = traces.shape + model_vals = model.shape[0] + SST = sumsq - (sums ** 2) / num_traces + SSR = np.empty((model_vals, trace_length)) - def finalize(self): - pass - - - def run(self, container, samples_range=None): - if samples_range == None: - self.samples_range = container.data.sample_length - self.samples_start = 0 - self.samples_end = container.data.sample_length - else: - self.samples_range = samples_range[1]-samples_range[0] - (self.samples_start, self.samples_end) = samples_range - - self.average_traces = [AverageTraces(256, self.samples_range) for _ in container.model_positions] # all key bytes - self.aes_key = [[] for _ in range(len(container.tiles))] + R2s = np.empty((model_vals, trace_length)) + + M0 = model[0].astype(np.uint32) + Tm = np.linalg.cholesky(M0.T @ M0) + U0 = solve_triangular(Tm.T, M0.T, lower=True) + + U0_wht = _WHT(U0.T) + + WL = _WHT(traces) + U0_iwht = _iWHT(WL.T[..., None] * U0_wht) + + betas = U0_iwht.transpose(1, 2, 0) + SSR = np.sum(betas**2, axis=1) + R2s = SSR / SST + + return R2s, betas + + def find_candidate(self, R2s): + r2_peaks = np.max(R2s, axis=1) + winning_candidate = int(np.argmax(r2_peaks)) + return winning_candidate + + def run(self, container): + self.populate(container) with Pool(processes=int(os.cpu_count()/2)) as pool: workload = [] for tile in container.tiles: (tile_x, tile_y) = tile for model_pos in container.model_positions: - workload.append((self, container, self.average_traces[model_pos], tile_x, tile_y, model_pos)) + workload.append((self, container, tile_x, tile_y, model_pos)) starmap_results = pool.starmap(self.run_workload, workload, chunksize=1) pool.close() pool.join() - for tile_x, tile_y, model_pos, tmp_key_byte in starmap_results: + for tile_x, tile_y, model_pos, candidate, r2s, betas in starmap_results: + self.R2s[model_pos] = r2s + self.betas[model_pos] = betas tile_index = list(container.tiles).index((tile_x, tile_y)) - self.aes_key[tile_index].append(tmp_key_byte) - - # Print recovered AES key(s) - for key in self.aes_key: - aes_key_bytes = bytes(key) - print("Recovered AES Key:", aes_key_bytes.hex()) - + self.results[tile_index].append(candidate) @staticmethod - def run_workload(self, container, average_traces, tile_x, tile_y, model_pos): - container.configure(tile_x, tile_y, [model_pos]) - v = np.zeros(self.samples_range) - u = np.zeros(self.samples_range) - n_tr = 0 - + def run_workload(self, container, tile_x, tile_y, model_pos): + num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step) + if self.convergence_step is None: + self.convergence_step = np.inf + + model_vals = self.model_value.model.num_vals + average_traces = AverageTraces(model_vals, container.sample_length) + sumsq = np.zeros(container.sample_length) + sums = np.zeros(container.sample_length) + n_traces = 0 + + r2s = np.empty((num_steps, model_vals, container.sample_length)) + + model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis] + hypotheses = self.model_value.calculate_table([model_input, None, None, None]) + + traces_processed = 0 + converge_index = 0 for batch in container.get_batches(tile_x,tile_y): - (average_traces, tmp_n_tr, tmp_v, tmp_u) = self.update(batch[-1][:,self.samples_start:self.samples_end], batch[0], average_traces, n_tr, v, u) - v += tmp_v - u += tmp_u - n_tr += tmp_n_tr - - key_byte = self.calculate(model_pos, average_traces, n_tr, v, u) - return tile_x, tile_y, model_pos, key_byte \ No newline at end of file + if traces_processed >= self.convergence_step: + instant_r2s, _ = self.calculate(average_traces, n_traces, sumsq, sums, hypotheses) + r2s[converge_index, :, :] = instant_r2s + traces_processed = 0 + converge_index += 1 + # Update + plaintext = batch[0] + traces = batch[-1] + average_traces, n_traces, sumsq, sums = self.update(traces, plaintext, average_traces, n_traces, sumsq, sums) + traces_processed += traces.shape[0] + + instant_r2s, betas = self.calculate(average_traces, n_traces, sumsq, sums, hypotheses) + r2s[converge_index, :, :] = instant_r2s + candidate = self.find_candidate(r2s[-1, ...]) + + return tile_x, tile_y, model_pos, candidate, r2s, betas diff --git a/src/scarr/model_values/model_bits.py b/src/scarr/model_values/model_bits.py new file mode 100644 index 0000000..50cb22c --- /dev/null +++ b/src/scarr/model_values/model_bits.py @@ -0,0 +1,53 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +import numpy as np +from .guess_model_value import GuessModelValue + + +class ModelBits(GuessModelValue): + + def __init__(self, model, bias=True) -> None: + self.num_vals = 2 + self.vals = np.arange(2) + self.num_bits = (model.num_vals-1).bit_length() * len(model) + (1 if bias else 0) + self.model = model + self.bias = bias + + def calculate(self, batch): + outputs = self.model.calculate(batch) + + unpacked = np.unpackbits(outputs[..., np.newaxis], axis=-1, bitorder='little') + + if self.bias: + bias_bits = np.ones(unpacked.shape[:-1]+(1,), dtype='uint8') + unpacked = np.concatenate((unpacked, bias_bits), axis=-1) + + return unpacked + + def calculate_table(self, batch): + outputs = self.model.calculate_table(batch) + + unpacked = np.unpackbits(np.atleast_3d(outputs), axis=-1, bitorder='little') + + if self.bias: + bias_bits = np.ones(unpacked.shape[:-1]+(1,), dtype='uint8') + unpacked = np.concatenate((unpacked, bias_bits), axis=-1) + + return unpacked + + def calculate_all_tables(self, batch): + outputs = self.model.calculate_all_tables(batch) + + unpacked = np.unpackbits(outputs, axis=-2, bitorder='little') + if self.bias: + bias_shape = unpacked.shape[:-2] + (1,) + unpacked.shape[-1:] + bias_bits = np.ones(bias_shape, dtype=unpacked.dtype) + unpacked = np.concatenate((unpacked, bias_bits), axis=-2) + + return unpacked diff --git a/src/scarr/model_values/model_value.py b/src/scarr/model_values/model_value.py index 49b8fbe..2d7f863 100644 --- a/src/scarr/model_values/model_value.py +++ b/src/scarr/model_values/model_value.py @@ -13,5 +13,8 @@ class ModelValue: def __init__(self) -> None: pass + def __len__(self): + return 1 + def calculate(self, batch): pass diff --git a/src/scarr/model_values/multi_model.py b/src/scarr/model_values/multi_model.py new file mode 100644 index 0000000..511fa86 --- /dev/null +++ b/src/scarr/model_values/multi_model.py @@ -0,0 +1,47 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +import numpy as np +from .model_value import ModelValue +from .guess_model_value import GuessModelValue + + +class MultiModel(GuessModelValue): + + def __init__(self, models) -> None: + self.models = models + # Expects that all models produce same val range + self.num_vals = models[0].num_vals + self.vals = models[0].vals + + def __len__(self): + return len(self.models) + + def calculate(self, batch): + all_models_output = self.models[0].calculate(batch)[np.newaxis, ...] + for m in self.models[1:]: + new_output = m.calculate(batch)[np.newaxis, ...] + all_models_output = np.concatenate((all_models_output, new_output), axis=-1) + + return all_models_output + + def calculate_table(self, batch): + all_models_output = self.models[0].calculate_table(batch)[..., np.newaxis] + for m in self.models[1:]: + new_output = m.calculate_table(batch)[..., np.newaxis] + all_models_output = np.concatenate((all_models_output, new_output), axis=-1) + + return all_models_output + + def calculate_all_tables(self, batch): + all_models_output = self.models[0].calculate_all_tables(batch)[..., np.newaxis] + for m in self.models[1:]: + new_output = m.calculate_all_tables(batch)[..., np.newaxis] + all_models_output = np.concatenate((all_models_output, new_output), axis=-1) + + return all_models_output.swapaxes(-1, -2) diff --git a/src/scarr/model_values/sbox.py b/src/scarr/model_values/sbox.py new file mode 100644 index 0000000..a8338d3 --- /dev/null +++ b/src/scarr/model_values/sbox.py @@ -0,0 +1,36 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + + +import numpy as np +from .guess_model_value import GuessModelValue +from .utils import AES_SBOX, KEYS + + +class Sbox(GuessModelValue): + + def __init__(self) -> None: + self.num_vals = 256 + self.vals = np.arange(256) + + def calculate(self, batch): + inputs = np.bitwise_xor(np.squeeze(batch[0]), np.squeeze(batch[1]), dtype=np.uint8) + + return AES_SBOX[inputs] + + def calculate_table(self, batch): + inputs = np.bitwise_xor(np.squeeze(batch[0]), KEYS, dtype=np.uint8) + + return AES_SBOX[inputs] + + def calculate_all_tables(self, batch): + return np.apply_along_axis(self.calculate_all_tables_helper, axis=1, arr=batch[0]) + + def calculate_all_tables_helper(self, data): + inputs = np.bitwise_xor(data, KEYS, dtype=np.uint8) + + return AES_SBOX[inputs] diff --git a/src/scarr/model_values/utils.py b/src/scarr/model_values/utils.py index 215c00a..cc02bf1 100644 --- a/src/scarr/model_values/utils.py +++ b/src/scarr/model_values/utils.py @@ -8,7 +8,7 @@ import numpy as np # The key hypotheseses which are the hex values 0-255 -KEYS = np.arange(256).reshape(-1, 1) +KEYS = np.arange(256, dtype='uint8').reshape(-1, 1) # AES-128 sbox used to compute model values AES_SBOX = np.array([99,124,119,123,242,107,111,197,48,1,103,43,254,215,171,118, @@ -26,7 +26,8 @@ 186,120,37,46,28,166,180,198,232,221,116,31,75,189,139,138, 112,62,181,102,72,3,246,14,97,53,87,185,134,193,29,158, 225,248,152,17,105,217,142,148,155,30,135,233,206,85,40,223, - 140,161,137,13,191,230,66,104,65,153,45,15,176,84,187,22]) + 140,161,137,13,191,230,66,104,65,153,45,15,176,84,187,22], + dtype='uint8') # Hamming weights of the values 0-255 used for model values WEIGHTS = np.array([0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,