diff --git a/core/src/main/java/com/google/adk/events/EventActions.java b/core/src/main/java/com/google/adk/events/EventActions.java index d183c6f74..24fa5e95a 100644 --- a/core/src/main/java/com/google/adk/events/EventActions.java +++ b/core/src/main/java/com/google/adk/events/EventActions.java @@ -41,6 +41,7 @@ public class EventActions { private ConcurrentMap requestedToolConfirmations = new ConcurrentHashMap<>(); private Optional endInvocation = Optional.empty(); + private Optional compaction = Optional.empty(); /** Default constructor for Jackson. */ public EventActions() {} @@ -139,6 +140,15 @@ public void setEndInvocation(boolean endInvocation) { this.endInvocation = Optional.of(endInvocation); } + @JsonProperty("compaction") + public Optional compaction() { + return compaction; + } + + public void setCompaction(Optional compaction) { + this.compaction = compaction; + } + public static Builder builder() { return new Builder(); } @@ -162,7 +172,8 @@ public boolean equals(Object o) { && Objects.equals(escalate, that.escalate) && Objects.equals(requestedAuthConfigs, that.requestedAuthConfigs) && Objects.equals(requestedToolConfirmations, that.requestedToolConfirmations) - && Objects.equals(endInvocation, that.endInvocation); + && Objects.equals(endInvocation, that.endInvocation) + && Objects.equals(compaction, that.compaction); } @Override @@ -175,7 +186,8 @@ public int hashCode() { escalate, requestedAuthConfigs, requestedToolConfirmations, - endInvocation); + endInvocation, + compaction); } /** Builder for {@link EventActions}. */ @@ -190,6 +202,7 @@ public static class Builder { private ConcurrentMap requestedToolConfirmations = new ConcurrentHashMap<>(); private Optional endInvocation = Optional.empty(); + private Optional compaction = Optional.empty(); public Builder() {} @@ -203,6 +216,7 @@ private Builder(EventActions eventActions) { this.requestedToolConfirmations = new ConcurrentHashMap<>(eventActions.requestedToolConfirmations()); this.endInvocation = eventActions.endInvocation(); + this.compaction = eventActions.compaction(); } @CanIgnoreReturnValue @@ -262,6 +276,13 @@ public Builder endInvocation(boolean endInvocation) { return this; } + @CanIgnoreReturnValue + @JsonProperty("compaction") + public Builder compaction(EventCompaction value) { + this.compaction = Optional.ofNullable(value); + return this; + } + @CanIgnoreReturnValue public Builder merge(EventActions other) { if (other.skipSummarization().isPresent()) { @@ -288,6 +309,9 @@ public Builder merge(EventActions other) { if (other.endInvocation().isPresent()) { this.endInvocation = other.endInvocation(); } + if (other.compaction().isPresent()) { + this.compaction = other.compaction(); + } return this; } @@ -301,6 +325,7 @@ public EventActions build() { eventActions.setRequestedAuthConfigs(this.requestedAuthConfigs); eventActions.setRequestedToolConfirmations(this.requestedToolConfirmations); eventActions.setEndInvocation(this.endInvocation); + eventActions.setCompaction(this.compaction); return eventActions; } } diff --git a/core/src/main/java/com/google/adk/events/EventCompaction.java b/core/src/main/java/com/google/adk/events/EventCompaction.java new file mode 100644 index 000000000..16dcb4d4a --- /dev/null +++ b/core/src/main/java/com/google/adk/events/EventCompaction.java @@ -0,0 +1,47 @@ +package com.google.adk.events; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.auto.value.AutoValue; +import com.google.genai.types.Content; + +/** The compaction of the events. */ +@AutoValue +@JsonDeserialize(builder = EventCompaction.Builder.class) +public abstract class EventCompaction { + + @JsonProperty("startTimestamp") + public abstract long startTimestamp(); + + @JsonProperty("endTimestamp") + public abstract long endTimestamp(); + + @JsonProperty("compactedContent") + public abstract Content compactedContent(); + + public static Builder builder() { + return new AutoValue_EventCompaction.Builder(); + } + + /** Builder for {@link EventCompaction}. */ + @AutoValue.Builder + public abstract static class Builder { + + @JsonCreator + static Builder create() { + return builder(); + } + + @JsonProperty("startTimestamp") + public abstract Builder startTimestamp(long startTimestamp); + + @JsonProperty("endTimestamp") + public abstract Builder endTimestamp(long endTimestamp); + + @JsonProperty("compactedContent") + public abstract Builder compactedContent(Content compactedContent); + + public abstract EventCompaction build(); + } +} diff --git a/core/src/main/java/com/google/adk/summarizer/BaseEventSummarizer.java b/core/src/main/java/com/google/adk/summarizer/BaseEventSummarizer.java new file mode 100644 index 000000000..a10799dbf --- /dev/null +++ b/core/src/main/java/com/google/adk/summarizer/BaseEventSummarizer.java @@ -0,0 +1,23 @@ +package com.google.adk.summarizer; + +import com.google.adk.events.Event; +import io.reactivex.rxjava3.core.Maybe; +import java.util.List; + +/** Base interface for producing events summary. */ +public interface BaseEventSummarizer { + + /** + * Compact a list of events into a single event. + * + *

If compaction failed, return {@link Maybe#empty()}. Otherwise, compact into a content and + * return it. + * + *

This method will summarize the events and return a new summary event indicating the range of + * events it summarized. + * + * @param events Events to compact. + * @return The new compacted event, or {@link Maybe#empty()} if no compaction happened. + */ + Maybe summarizeEvents(List events); +} diff --git a/core/src/main/java/com/google/adk/summarizer/EventCompactor.java b/core/src/main/java/com/google/adk/summarizer/EventCompactor.java new file mode 100644 index 000000000..27ac67de7 --- /dev/null +++ b/core/src/main/java/com/google/adk/summarizer/EventCompactor.java @@ -0,0 +1,20 @@ +package com.google.adk.summarizer; + +import com.google.adk.events.Event; +import com.google.adk.sessions.BaseSessionService; +import com.google.adk.sessions.Session; +import io.reactivex.rxjava3.core.Maybe; + +/** Base interface for compacting events. */ +public interface EventCompactor { + + /** + * Compacts events in the given session. If there is compaction happened, the new compaction event + * will be appended to the given {@link BaseSessionService}. + * + * @param session the session containing the events to be compacted. + * @param sessionService the session service for appending the new compaction event. + * @return the {@link Event} containing the events summary. + */ + Maybe compact(Session session, BaseSessionService sessionService); +} diff --git a/core/src/main/java/com/google/adk/summarizer/EventsCompactionConfig.java b/core/src/main/java/com/google/adk/summarizer/EventsCompactionConfig.java new file mode 100644 index 000000000..7fad2b962 --- /dev/null +++ b/core/src/main/java/com/google/adk/summarizer/EventsCompactionConfig.java @@ -0,0 +1,21 @@ +package com.google.adk.summarizer; + +import java.util.Optional; + +/** + * Configuration for event compaction. + * + * @param compactionInterval The number of new user-initiated invocations that, once fully + * represented in the session's events, will trigger a compaction. + * @param overlapSize The number of preceding invocations to include from the end of the last + * compacted range. This creates an overlap between consecutive compacted summaries, maintaining + * context. + * @param summarizer An optional event summarizer to use for compaction. + */ +public record EventsCompactionConfig( + int compactionInterval, int overlapSize, Optional summarizer) { + + public EventsCompactionConfig(int compactionInterval, int overlapSize) { + this(compactionInterval, overlapSize, Optional.empty()); + } +} diff --git a/core/src/main/java/com/google/adk/summarizer/LlmEventSummarizer.java b/core/src/main/java/com/google/adk/summarizer/LlmEventSummarizer.java new file mode 100644 index 000000000..879b66402 --- /dev/null +++ b/core/src/main/java/com/google/adk/summarizer/LlmEventSummarizer.java @@ -0,0 +1,101 @@ +package com.google.adk.summarizer; + +import static java.util.function.Predicate.not; +import static java.util.stream.Collectors.joining; + +import com.google.adk.events.Event; +import com.google.adk.events.EventActions; +import com.google.adk.events.EventCompaction; +import com.google.adk.models.BaseLlm; +import com.google.adk.models.LlmRequest; +import com.google.common.collect.ImmutableList; +import com.google.genai.types.Content; +import com.google.genai.types.Part; +import io.reactivex.rxjava3.core.Maybe; +import java.util.List; +import java.util.Optional; + +/** An LLM-based event summarizer for sliding window compaction. */ +public final class LlmEventSummarizer implements BaseEventSummarizer { + + private static final String DEFAULT_PROMPT_TEMPLATE = + """ + The following is a conversation history between a user and an AI \ + agent. Please summarize the conversation, focusing on key \ + information and decisions made, as well as any unresolved \ + questions or tasks. The summary should be concise and capture the \ + essence of the interaction. + + {conversation_history} + """; + + private final BaseLlm baseLlm; + private final String promptTemplate; + + public LlmEventSummarizer(BaseLlm baseLlm) { + this(baseLlm, DEFAULT_PROMPT_TEMPLATE); + } + + public LlmEventSummarizer(BaseLlm baseLlm, String promptTemplate) { + this.baseLlm = baseLlm; + this.promptTemplate = promptTemplate; + } + + @Override + public Maybe summarizeEvents(List events) { + if (events.isEmpty()) { + return Maybe.empty(); + } + + String conversationHistory = formatEventsForPrompt(events); + String prompt = promptTemplate.replace("{conversation_history}", conversationHistory); + + LlmRequest llmRequest = + LlmRequest.builder() + .model(baseLlm.model()) + .contents( + ImmutableList.of( + Content.builder() + .role("user") + .parts(ImmutableList.of(Part.fromText(prompt))) + .build())) + .build(); + + return baseLlm + .generateContent(llmRequest, false) + .firstElement() + .flatMap( + llmResponse -> + Maybe.fromOptional( + llmResponse + .content() + .map(content -> content.toBuilder().role("model").build()) + .map( + summaryContent -> + EventCompaction.builder() + .startTimestamp(events.get(0).timestamp()) + .endTimestamp(events.get(events.size() - 1).timestamp()) + .compactedContent(summaryContent) + .build()) + .map( + compaction -> + Event.builder() + .author("user") + .actions(EventActions.builder().compaction(compaction).build()) + .invocationId(Event.generateEventId()) + .build()))); + } + + private String formatEventsForPrompt(List events) { + return events.stream() + .flatMap( + event -> + event.content().flatMap(Content::parts).stream() + .flatMap(List::stream) + .map(Part::text) + .flatMap(Optional::stream) + .filter(not(String::isEmpty)) + .map(text -> event.author() + ": " + text)) + .collect(joining("\\n")); + } +} diff --git a/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java b/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java new file mode 100644 index 000000000..28264a7db --- /dev/null +++ b/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java @@ -0,0 +1,147 @@ +package com.google.adk.summarizer; + +import com.google.adk.events.Event; +import com.google.adk.events.EventCompaction; +import com.google.adk.sessions.BaseSessionService; +import com.google.adk.sessions.Session; +import com.google.common.collect.Lists; +import io.reactivex.rxjava3.core.Maybe; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +/** + * This class performs events compaction in a sliding window fashion based on the {@link + * EventsCompactionConfig}. + */ +public final class SlidingWindowEventCompactor implements EventCompactor { + + private final EventsCompactionConfig config; + private final BaseEventSummarizer summarizer; + + public SlidingWindowEventCompactor(EventsCompactionConfig config) { + this.config = config; + // TODO default to LLM summarizer + this.summarizer = config.summarizer().orElseThrow(); + } + + /** + * Runs compaction for SlidingWindowCompactor. + * + *

This method implements the sliding window compaction logic. It determines if enough new + * invocations have occurred since the last compaction based on {@link + * EventsCompactionConfig#compactionInterval()}. If so, it selects a range of events to compact + * based on {@link EventsCompactionConfig#overlapSize()}, and calls {@link + * BaseEventSummarizer#summarizeEvents(List)}. + * + *

The compaction process is controlled by two parameters: + * + *

1. {@link EventsCompactionConfig#compactionInterval()}: The number of *new* user-initiated + * invocations that, once fully represented in the session's events, will trigger a compaction. 2. + * `overlap_size`: The number of preceding invocations to include from the end of the last + * compacted range. This creates an overlap between consecutive compacted summaries, maintaining + * context. + * + *

The compactor is called after an agent has finished processing a turn and all its events + * have been added to the session. It checks if a new compaction is needed. + * + *

When a compaction is triggered: - The compactor identifies the range of `invocation_id`s to + * be summarized. - This range starts `overlap_size` invocations before the beginning of the new + * block of `compaction_invocation_threshold` invocations and ends with the last invocation in the + * current block. - A `CompactedEvent` is created, summarizing all events within this determined + * `invocation_id` range. This `CompactedEvent` is then appended to the session. + * + *

Here is an example with `compaction_invocation_threshold = 2` and `overlap_size = 1`: Let's + * assume events are added for `invocation_id`s 1, 2, 3, and 4 in order. + * + *

1. **After `invocation_id` 2 events are added:** - The session now contains events for + * invocations 1 and 2. This fulfills the `compaction_invocation_threshold = 2` criteria. - Since + * this is the first compaction, the range starts from the beginning. - A `CompactedEvent` is + * generated, summarizing events within `invocation_id` range [1, 2]. - The session now contains: + * `[ E(inv=1, role=user), E(inv=1, role=model), E(inv=2, role=user), E(inv=2, role=model), + * CompactedEvent(inv=[1, 2])]`. + * + *

2. **After `invocation_id` 3 events are added:** - No compaction happens yet, because only 1 + * new invocation (`inv=3`) has been completed since the last compaction, and + * `compaction_invocation_threshold` is 2. + * + *

3. **After `invocation_id` 4 events are added:** - The session now contains new events for + * invocations 3 and 4, again fulfilling `compaction_invocation_threshold = 2`. - The last + * `CompactedEvent` covered up to `invocation_id` 2. With `overlap_size = 1`, the new compaction + * range will start one invocation before the new block (inv 3), which is `invocation_id` 2. - The + * new compaction range is from `invocation_id` 2 to 4. - A new `CompactedEvent` is generated, + * summarizing events within `invocation_id` range [2, 4]. - The session now contains: `[ E(inv=1, + * role=user), E(inv=1, role=model), E(inv=2, role=user), E(inv=2, role=model), + * CompactedEvent(inv=[1, 2]), E(inv=3, role=user), E(inv=3, role=model), E(inv=4, role=user), + * E(inv=4, role=model), CompactedEvent(inv=[2, 4])]`. + */ + @Override + public Maybe compact(Session session, BaseSessionService sessionService) { + return getCompactionEvents(session) + .flatMap(summarizer::summarizeEvents) + .flatMapSingle(e -> sessionService.appendEvent(session, e)); + } + + private Maybe> getCompactionEvents(Session session) { + List eventsToCompact = new ArrayList<>(); + Set invocationsToCompact = new HashSet<>(); + long lastCompactTimestamp = -1L; + int targetSize = -1; + + // Scan the list of events backward so that timestamp are in decreasing fashion. + ListIterator iter = session.events().listIterator(session.events().size()); + while (iter.hasPrevious()) { + Event event = iter.previous(); + String invocationId = event.invocationId(); + + // For regular event, there should be an invocation id. + if (invocationId != null && !isCompactEvent(event)) { + // If an invocation is included for compaction, include all the events for that invocation + if (invocationsToCompact.contains(invocationId)) { + eventsToCompact.add(event); + continue; + } + // When encountered an event that is already compacted, there are possible scenarios + // 1. Not enough uncompacted invocations as defined by the "compactionInterval", we can + // break without compaction needed. + // 2. Enough uncompacted invocations, hence we need to keep adding "overlapSize" more of + // invocations. + if (event.timestamp() <= lastCompactTimestamp) { + if (invocationsToCompact.size() < config.compactionInterval()) { + break; + } + if (targetSize < 0) { + targetSize = invocationsToCompact.size() + config.overlapSize(); + } + } + // Adds the event to be compacted until enough is accumulated based on the configuration + if (targetSize < 0 || invocationsToCompact.size() < targetSize) { + eventsToCompact.add(event); + invocationsToCompact.add(invocationId); + } else { + break; + } + } else if (isCompactEvent(event)) { + // Record the latest compaction timestamp + lastCompactTimestamp = + Long.max( + lastCompactTimestamp, + event.actions().compaction().map(EventCompaction::endTimestamp).orElse(-1L)); + } + } + + // Compaction threshold is not met, no compaction needed + if (invocationsToCompact.size() < config.compactionInterval()) { + return Maybe.empty(); + } + + // The events were added backward, reserve it back to prepare for compaction + return Maybe.just(Lists.reverse(eventsToCompact)); + } + + private static boolean isCompactEvent(Event event) { + return event.actions() != null && event.actions().compaction().isPresent(); + } +} diff --git a/core/src/test/java/com/google/adk/events/EventActionsTest.java b/core/src/test/java/com/google/adk/events/EventActionsTest.java index 12684371c..faa389c6d 100644 --- a/core/src/test/java/com/google/adk/events/EventActionsTest.java +++ b/core/src/test/java/com/google/adk/events/EventActionsTest.java @@ -20,6 +20,7 @@ import com.google.adk.tools.ToolConfirmation; import com.google.common.collect.ImmutableMap; +import com.google.genai.types.Content; import com.google.genai.types.Part; import java.util.concurrent.ConcurrentHashMap; import org.junit.Test; @@ -30,17 +31,25 @@ public final class EventActionsTest { private static final Part PART = Part.builder().text("text").build(); + private static final Content CONTENT = Content.builder().parts(PART).build(); private static final ToolConfirmation TOOL_CONFIRMATION = ToolConfirmation.builder().hint("hint").confirmed(true).build(); + private static final EventCompaction COMPACTION = + EventCompaction.builder() + .startTimestamp(123L) + .endTimestamp(456L) + .compactedContent(CONTENT) + .build(); @Test public void toBuilder_createsBuilderWithSameValues() { EventActions eventActionsWithSkipSummarization = - EventActions.builder().skipSummarization(true).build(); + EventActions.builder().skipSummarization(true).compaction(COMPACTION).build(); EventActions eventActionsAfterRebuild = eventActionsWithSkipSummarization.toBuilder().build(); assertThat(eventActionsAfterRebuild).isEqualTo(eventActionsWithSkipSummarization); + assertThat(eventActionsAfterRebuild.compaction()).hasValue(COMPACTION); } @Test @@ -55,6 +64,7 @@ public void merge_mergesAllFields() { ImmutableMap.of("config1", new ConcurrentHashMap<>(ImmutableMap.of("k", "v"))))) .requestedToolConfirmations( new ConcurrentHashMap<>(ImmutableMap.of("tool1", TOOL_CONFIRMATION))) + .compaction(COMPACTION) .build(); EventActions eventActions2 = EventActions.builder() @@ -86,5 +96,6 @@ public void merge_mergesAllFields() { assertThat(merged.requestedToolConfirmations()) .containsExactly("tool1", TOOL_CONFIRMATION, "tool2", TOOL_CONFIRMATION); assertThat(merged.endInvocation()).hasValue(true); + assertThat(merged.compaction()).hasValue(COMPACTION); } } diff --git a/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java b/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java new file mode 100644 index 000000000..8fcc104ec --- /dev/null +++ b/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java @@ -0,0 +1,185 @@ +package com.google.adk.summarizer; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.adk.events.Event; +import com.google.adk.events.EventActions; +import com.google.adk.events.EventCompaction; +import com.google.adk.sessions.BaseSessionService; +import com.google.adk.sessions.Session; +import com.google.common.collect.ImmutableList; +import com.google.common.truth.Correspondence; +import com.google.genai.types.Content; +import com.google.genai.types.Part; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Single; +import java.util.List; +import java.util.Optional; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class SlidingWindowEventCompactorTest { + + @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + @Mock private BaseSessionService mockSessionService; + @Mock BaseEventSummarizer mockSummarizer; + @Captor ArgumentCaptor> eventListCaptor; + + @Test + public void compaction_noEvents() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(2, 1, Optional.of(mockSummarizer))); + Session session = Session.builder("id").build(); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + verify(mockSessionService, never()).appendEvent(any(), any()); + } + + @Test + public void compaction_notEnoughInvocations() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(2, 1, Optional.of(mockSummarizer))); + Session session = + Session.builder("id") + .events(ImmutableList.of(Event.builder().invocationId("1").build())) + .build(); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + verify(mockSessionService, never()).appendEvent(any(), any()); + } + + @Test + public void compaction_firstCompaction() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(2, 1, Optional.of(mockSummarizer))); + // Add 4 events without any compaction event + ImmutableList events = + ImmutableList.of( + Event.builder().invocationId("1").timestamp(1).build(), + Event.builder().invocationId("2").timestamp(2).build(), + Event.builder().invocationId("3").timestamp(3).build(), + Event.builder().invocationId("4").timestamp(4).build()); + Session session = Session.builder("id").events(events).build(); + Event compactedEvent = createCompactedEvent(1, 4, "Summary 1-4"); + when(mockSummarizer.summarizeEvents(any())).thenReturn(Maybe.just(compactedEvent)); + when(mockSessionService.appendEvent(any(), any())).then(i -> Single.just(i.getArgument(1))); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + // Even with the interval = 2 and overlap = 1, all 4 events should be included + verify(mockSummarizer).summarizeEvents(eq(events)); + verify(mockSessionService).appendEvent(eq(session), eq(compactedEvent)); + } + + @Test + public void compaction_withOverlap() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(2, 1, Optional.of(mockSummarizer))); + // First 2 events are compacted, plus three uncompacted events + ImmutableList events = + ImmutableList.of( + Event.builder().invocationId("1").timestamp(1).build(), + Event.builder().invocationId("2").timestamp(2).build(), + createCompactedEvent(1, 2, "Summary 1-2"), + Event.builder().invocationId("3").timestamp(3).build(), + Event.builder().invocationId("4").timestamp(4).build(), + Event.builder().invocationId("5").timestamp(5).build()); + Session session = Session.builder("id").events(events).build(); + Event compactedEvent = createCompactedEvent(2, 5, "Summary 2-5"); + when(mockSummarizer.summarizeEvents(any())).thenReturn(Maybe.just(compactedEvent)); + when(mockSessionService.appendEvent(any(), any())).then(i -> Single.just(i.getArgument(1))); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + + // Should include events 2-5. + verify(mockSummarizer).summarizeEvents(eventListCaptor.capture()); + assertThat(eventListCaptor.getValue()) + .comparingElementsUsing( + Correspondence.from( + (actual, expected) -> actual.invocationId().equals(expected), "")) + .containsExactly("2", "3", "4", "5"); + verify(mockSessionService).appendEvent(eq(session), eq(compactedEvent)); + } + + @Test + public void compaction_multipleEventsWithSameInvocation() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(1, 1, Optional.of(mockSummarizer))); + ImmutableList events = + ImmutableList.of( + Event.builder().invocationId("1").timestamp(1).build(), + Event.builder().invocationId("1").timestamp(2).build(), + createCompactedEvent(1, 2, "Summary 1"), + Event.builder().invocationId("2").timestamp(3).build(), + Event.builder().invocationId("2").timestamp(4).build()); + Session session = Session.builder("id").events(events).build(); + Event compactedEvent = createCompactedEvent(1, 4, "Summary 1-2"); + when(mockSummarizer.summarizeEvents(any())).thenReturn(Maybe.just(compactedEvent)); + when(mockSessionService.appendEvent(any(), any())).then(i -> Single.just(i.getArgument(1))); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + + // Should include invocations 1-2, with all 4 events. + verify(mockSummarizer).summarizeEvents(eventListCaptor.capture()); + assertThat(eventListCaptor.getValue()) + .comparingElementsUsing( + Correspondence.from( + (actual, expected) -> actual.timestamp() == expected, "")) + .containsExactly(1L, 2L, 3L, 4L); + + verify(mockSessionService).appendEvent(eq(session), eq(compactedEvent)); + } + + @Test + public void compaction_noCompactionEventFromSummarizer() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(1, 0, Optional.of(mockSummarizer))); + ImmutableList events = + ImmutableList.of(Event.builder().invocationId("1").timestamp(1).build()); + Session session = Session.builder("id").events(events).build(); + when(mockSummarizer.summarizeEvents(any())).thenReturn(Maybe.empty()); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + + // The summarizer should get called since interval = 1 + verify(mockSummarizer).summarizeEvents(eq(events)); + // No compaction event produced since the summarize returns empty. + verify(mockSessionService, never()).appendEvent(any(), any()); + } + + private Event createCompactedEvent(long startTimestamp, long endTimestamp, String content) { + return Event.builder() + .actions( + EventActions.builder() + .compaction( + EventCompaction.builder() + .startTimestamp(startTimestamp) + .endTimestamp(endTimestamp) + .compactedContent( + Content.builder() + .role("model") + .parts(Part.builder().text(content).build()) + .build()) + .build()) + .build()) + .build(); + } +}