Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion KunQuant/Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def push_source(is_simple=False):
is_single_source = split_source == 0
# the set of names of custom cross sectional functions
generated_cross_sectional_func = set()
stream_state_buffer_init = []
for func in impl:
if split_source > 0 and cur_count > split_source:
push_source()
Expand All @@ -232,7 +233,6 @@ def push_source(is_simple=False):
def query_temp_buf_id(tempname: str, window: int) -> int:
input_windows[tempname] = window
return insert_name_str(tempname, "TEMP").idx
stream_state_buffer_init = []
src, decl = codegen_cpp(module_name, func, input_name_to_idx, ins, outs, options, stream_mode, query_temp_buf_id, input_windows, stream_state_buffer_init, generated_cross_sectional_func, dtype, blocking_len, not allow_unaligned, is_single_source)
impl_src.append(src)
decl_src.append(decl)
Expand Down
10 changes: 7 additions & 3 deletions cpp/Kun/Runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ template <typename T>
char *StreamBuffer<T>::make(size_t stock_count, size_t window_size,
size_t simd_len) {
auto ret = kunAlignedAlloc(
sizeof(T) * simd_len, StreamBuffer::getBufferSize(stock_count, window_size, simd_len));
KUN_MALLOC_ALIGNMENT,
roundUp(StreamBuffer::getBufferSize(stock_count, window_size, simd_len),
KUN_MALLOC_ALIGNMENT));
auto buf = (StreamBuffer *)ret;
auto data = buf->getBuffer();
auto rounded_stock_count = roundUp(stock_count, simd_len);
Expand Down Expand Up @@ -487,8 +489,10 @@ bool StreamContext::serializeStates(OutputStreamBase* stream) {
StateBuffer *StateBuffer::make(size_t num_objs, size_t elem_size,
CtorFn_t ctor_fn, DtorFn_t dtor_fn, SerializeFn_t serialize_fn,
DeserializeFn_t deserialize_fn) {
auto ret = kunAlignedAlloc(KUN_MALLOC_ALIGNMENT,
sizeof(StateBuffer) + num_objs * elem_size);
auto ret =
kunAlignedAlloc(KUN_MALLOC_ALIGNMENT,
roundUp(sizeof(StateBuffer) + num_objs * elem_size,
KUN_MALLOC_ALIGNMENT));
auto buf = (StateBuffer *)ret;
buf->num_objs = num_objs;
buf->elem_size = elem_size;
Expand Down
27 changes: 27 additions & 0 deletions tests/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,31 @@ def test_stream_lifetime_gh_issue_41():

####################################

def repro_crash_gh_issue_71():
print("Building factors...")
builder = Builder()
with builder:
inp1 = Input("close")
# Generate MANY factors to stress memory/heap
for i in range(20):
Output(WindowedQuantile(inp1, 10, 0.5 + i * 0.01), f"qtl_{i}")
for i in range(20):
Output(WindowedLinearRegressionSlope(inp1, 10 + i), f"beta_{i}")
f = Function(builder.ops)
return "test_repro_crash_gh_issue_71", f, KunCompilerConfig(partition_factor=3, output_layout="STREAM", options={"opt_reduce": False, "fast_log": True})

def test_repro_crash_gh_issue_71(lib):
num_symbols = 24
executor = kr.createSingleThreadExecutor()
modu = lib.getModule("test_repro_crash_gh_issue_71")
stream = kr.StreamContext(executor, modu, num_symbols)
data = np.random.rand(num_symbols).astype("float32")
h_close = stream.queryBufferHandle("close")
for i in range(20):
stream.pushData(h_close, data)
stream.run()

####################################

def create_stream_double():
builder = Builder()
Expand Down Expand Up @@ -680,6 +705,7 @@ def rolling_max_dd(x, window_size, min_periods=1):
check_covar(),
check_quantile(),
check_large_rank(),
repro_crash_gh_issue_71(),
]
lib = cfake.compileit(funclist, "test", cfake.CppCompilerConfig(machine=get_compiler_flags()))

Expand All @@ -706,4 +732,5 @@ def rolling_max_dd(x, window_size, min_periods=1):
test_covar(lib)
test_quantile(lib)
test_large_rank(lib)
test_repro_crash_gh_issue_71(lib)
print("done")