Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 160 additions & 193 deletions src/scarr/engines/lra.py
Original file line number Diff line number Diff line change
@@ -1,193 +1,160 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
#
# This Source Code Form is "Incompatible With Secondary Licenses", as
# defined by the Mozilla Public License, v. 2.0.

from .engine import Engine
from multiprocessing.pool import Pool
import numpy as np
import os

import matplotlib.pyplot as plt

# Class to compute average traces
class AverageTraces:
def __init__(self, num_values, trace_length):
self.avtraces = np.zeros((num_values, trace_length))
self.counters = np.zeros(num_values)

# Method to add a trace and update the average
def add_trace(self, data, trace):
if self.counters[data] == 0:
self.avtraces[data] = trace
else:
self.avtraces[data] = self.avtraces[data] + (trace - self.avtraces[data]) / self.counters[data]
self.counters[data] += 1

# Method to get data with non-zero counters and corresponding average traces
def get_data(self):
avdata_snap = np.flatnonzero(self.counters)
avtraces_snap = self.avtraces[avdata_snap]
return avdata_snap, avtraces_snap


# Function to compute S-box output
def s_box_out(data, key_byte, sbox):
s_box_in = data ^ key_byte
return sbox[s_box_in]


# Linear regression analysis (LRA) for each byte of the key
def lra(data, traces, intermediate_fkt, sbox, sst):
num_traces, trace_length = traces.shape
SSR = np.empty((256, trace_length))
all_coefs = np.empty((256, 9, trace_length))

for key_byte in np.arange(256, dtype='uint8'):
intermediate_var = intermediate_fkt(data, key_byte, sbox)
M = np.array(list(map(wrapper(8), intermediate_var)))

P = (np.linalg.inv(M.T @ M)) @ M.T

beta = P @ traces

E = M @ beta

SSR[key_byte] = np.sum((E - traces) **2 , axis=0)

all_coefs[key_byte,:] = beta[:]

R2 = 1 - SSR / sst[None, :]
return R2, all_coefs


# Function to generate a model of bits
def model_single_bits(x, bit_width):
model = []
for i in range(0, bit_width):
bit = (x >> i) & 1
model.append(bit)
model.append(1)
return model


# Wrapper function to map a model to an intermediate variable
def wrapper(y):
def curry(x):
return model_single_bits(x, y)
return curry


# Function to plot traces
def plot_traces(traces, nb):
for i in range(nb):
plt.plot(traces[i])
plt.show()


def fixed_sst(traces):
num_traces, trace_length = traces.shape
u = np.zeros(trace_length)
v = np.zeros(trace_length)
for i in range(num_traces):
u += traces[i]
v += traces[i]**2
return np.subtract(v,np.square(u)/num_traces)


class LRA(Engine):
def __init__(self, key_bytes=np.arange) -> None:
self.key_bytes = key_bytes
self.samples_len = 0
self.traces_len = 0
self.batch_size = 0
self.batches_num = 0

self.average_traces = []
self.aes_key = []

self.samples_range = None
self.samples_start = self.samples_end = 0

# S-box definition
self.sbox = np.array([
0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76,
0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0,
0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15,
0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75,
0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84,
0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF,
0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8,
0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2,
0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73,
0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB,
0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79,
0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08,
0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A,
0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E,
0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF,
0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16
])


def update(self, traces: np.ndarray, plaintext: np.ndarray , average_traces):
for i in range(traces.shape[0]):
average_traces.add_trace(plaintext[i], traces[i])

return average_traces


def calculate(self, byte_idx, average_traces):
plain, trace = average_traces.get_data()
sst = fixed_sst(trace)
r2, coefs = lra(plain, trace, s_box_out, self.sbox, sst)
r2_peaks = np.max(r2, axis=1)
winning_byte = np.argmax(r2_peaks)

print(f"Key Byte {byte_idx}: {winning_byte:02x}, Max R2: {np.max(r2_peaks):.5f}")
return winning_byte


def run(self, container, samples_range=None):
if samples_range == None:
self.samples_range = container.data.sample_length
self.samples_start = 0
self.samples_end = container.data.sample_length
else:
self.samples_range = samples_range[1]-samples_range[0]
(self.samples_start, self.samples_end) = samples_range

self.average_traces = [AverageTraces(256, self.samples_range) for _ in range(len(container.model_positions))] # all key bytes
self.aes_key = [[] for _ in range(len(container.tiles))]

with Pool(processes=int(os.cpu_count()/2)) as pool:
workload = []
for tile in container.tiles:
(tile_x, tile_y) = tile
for model_pos in container.model_positions:
workload.append((self, container, self.average_traces[model_pos], tile_x, tile_y, model_pos))
starmap_results = pool.starmap(self.run_workload, workload, chunksize=1)
pool.close()
pool.join()

for tile_x, tile_y, model_pos, tmp_key_byte in starmap_results:
tile_index = list(container.tiles).index((tile_x, tile_y))
self.aes_key[tile_index].append(tmp_key_byte)

# Print recovered AES key(s)
for key in self.aes_key:
aes_key_bytes = bytes(key)
print("Recovered AES Key:", aes_key_bytes.hex())


@staticmethod
def run_workload(self, container, average_traces, tile_x, tile_y, model_pos):
container.configure(tile_x, tile_y, [model_pos])

for batch in container.get_batches(tile_x, tile_y):
average_traces = self.update(batch[-1][:,self.samples_start:self.samples_end], batch[0], average_traces)

key_byte = self.calculate(model_pos, average_traces)
return tile_x, tile_y, model_pos, key_byte
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
#
# This Source Code Form is "Incompatible With Secondary Licenses", as
# defined by the Mozilla Public License, v. 2.0.


from .engine import Engine
from ..model_values.model_value import ModelValue
from ..model_values.model_bits import ModelBits
from multiprocessing.pool import Pool
import numpy as np
import os


# Class to compute average traces
class AverageTraces:
def __init__(self, num_values, trace_length):
self.avtraces = np.zeros((num_values, trace_length))
self.counters = np.zeros(num_values)

# Method to add a trace and update the average
def add_trace(self, data, trace):
if self.counters[data] == 0:
self.avtraces[data] = trace
else:
self.avtraces[data] = self.avtraces[data] + (trace - self.avtraces[data]) / self.counters[data]
self.counters[data] += 1

# Method to get data with non-zero counters and corresponding average traces
def get_data(self):
avdata_snap = np.flatnonzero(self.counters)
avtraces_snap = self.avtraces[avdata_snap]
return avdata_snap, avtraces_snap


class LRA(Engine):
def __init__(self, model_value: ModelValue, convergence_step=None, normalize=False, bias=True) -> None:
self.normalize = normalize

self.convergence_step = convergence_step
self.R2s = None
self.betas = None
self.results = []

super().__init__(ModelBits(model_value, bias))

def get_R2s(self):
return self.R2s

def get_betas(self):
return self.betas

def get_results(self):
return self.results

def populate(self, container):
model_vals = self.model_value.model.num_vals
num_steps = -(container.data.traces_length // -self.convergence_step) if self.convergence_step else 1

# R2s and Beta values, Results
self.R2s = np.zeros((len(container.model_positions),
num_steps,
model_vals,
container.sample_length), dtype=np.float64)
self.betas = np.zeros((len(container.model_positions),
model_vals,
self.model_value.num_bits,
container.sample_length), dtype=np.float64)
self.results = [[] for _ in range(len(container.tiles))]

def update(self, traces: np.ndarray, plaintext: np.ndarray, average_traces):
for i in range(traces.shape[0]):
average_traces.add_trace(plaintext[i], traces[i])

return average_traces

def calculate(self, average_traces, model):
plain, traces = average_traces.get_data()
num_traces, trace_length = traces.shape
model_vals = model.shape[0]

SST = np.sum(np.square(traces), axis=0) - np.square(np.sum(traces, axis=0))/num_traces
SSR = np.empty((model_vals, trace_length))

# Linear regression analysis (LRA) for each model position
P = np.linalg.pinv(model)
betas = P @ traces
# Below loop is equivalent to:
# E = model @ beta
# SSR = np.sum((E - traces)**2, axis=1)
# However this takes too much memory
for i in range(0, betas.shape[-1], step := 1):
E = model @ betas[..., i:i+step]
SSR[..., i:i+step] = np.sum((E - traces[:, i:i+step])**2, axis=1)

R2s = 1 - SSR / SST[None, :]
if self.normalize: # Normalization
R2s = (R2s - np.mean(R2s, axis=0, keepdims=True)) / np.std(R2s, axis=0, keepdims=True)

return R2s, betas

def find_candidate(self, R2s):
r2_peaks = np.max(R2s, axis=1)
winning_candidate = np.argmax(r2_peaks)
return winning_candidate

def run(self, container):
self.populate(container)

with Pool(processes=int(os.cpu_count()/2)) as pool:
workload = []
for tile in container.tiles:
(tile_x, tile_y) = tile
for model_pos in container.model_positions:
workload.append((self, container, tile_x, tile_y, model_pos))
starmap_results = pool.starmap(self.run_workload, workload, chunksize=1)
pool.close()
pool.join()

for tile_x, tile_y, model_pos, candidate, r2s, betas in starmap_results:
self.R2s[model_pos] = r2s
self.betas[model_pos] = betas
tile_index = list(container.tiles).index((tile_x, tile_y))
self.results[tile_index].append(candidate)

@staticmethod
def run_workload(self, container, tile_x, tile_y, model_pos):
num_steps = container.configure(tile_x, tile_y, [model_pos], self.convergence_step)
if self.convergence_step is None:
self.convergence_step = np.inf

model_vals = self.model_value.model.num_vals
average_traces = AverageTraces(model_vals, container.sample_length)

r2s = np.empty((num_steps, model_vals, container.sample_length))

model_input = np.arange(model_vals, dtype='uint8')[..., np.newaxis]
hypotheses = self.model_value.calculate_table([model_input, None, None, None])

traces_processed = 0
converge_index = 0
for batch in container.get_batches(tile_x, tile_y):
if traces_processed >= self.convergence_step:
instant_r2s, _ = self.calculate(average_traces, hypotheses)
r2s[converge_index, :, :] = instant_r2s
traces_processed = 0
converge_index += 1
# Update
plaintext = batch[0]
traces = batch[-1]
average_traces = self.update(traces, plaintext, average_traces)
traces_processed += traces.shape[0]

instant_r2s, betas = self.calculate(average_traces, hypotheses)
r2s[converge_index, :, :] = instant_r2s
candidate = self.find_candidate(r2s[-1, ...])

return tile_x, tile_y, model_pos, candidate, r2s, betas
Loading
Loading