diff --git a/iterable-extension/src/main/java/com/mparticle/ext/iterable/ErrorResponse.java b/iterable-extension/src/main/java/com/mparticle/ext/iterable/ErrorResponse.java new file mode 100644 index 0000000..627f1c6 --- /dev/null +++ b/iterable-extension/src/main/java/com/mparticle/ext/iterable/ErrorResponse.java @@ -0,0 +1,36 @@ +package com.mparticle.ext.iterable; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +public class ErrorResponse { + @JsonProperty(value="statusCode", required=true) + public int statusCode; + + @JsonProperty(value="body", required=true) + public Map body; + + public ErrorResponse() {} + + public ErrorResponse(int statusCode, Map body) { + this.statusCode = statusCode; + this.body = body; + } + + public int getStatusCode() { + return statusCode; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public Map getBody() { + return body; + } + + public void setBody(Map body) { + this.body = body; + } +} diff --git a/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableExtension.java b/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableExtension.java index e42656a..8aa28de 100644 --- a/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableExtension.java +++ b/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableExtension.java @@ -80,11 +80,7 @@ private void processPushOpens(EventProcessingRequest processingRequest) throws I } request.createdAt = (int) (event.getTimestamp() / 1000.0); Response response = iterableService.trackPushOpen(getApiKey(processingRequest), request).execute(); - if (response.isSuccessful() && !response.body().isSuccess()) { - throw new IOException(response.body().toString()); - } else if (!response.isSuccessful()) { - throw new IOException("Error sending push-open to Iterable: HTTP " + response.code()); - } + handleIterableResponse(response); } } } @@ -167,11 +163,7 @@ public void processPushSubscriptionEvent(PushSubscriptionEvent event) throws IOE } Response response = iterableService.registerToken(getApiKey(event), request).execute(); - if (response.isSuccessful() && !response.body().isSuccess()) { - throw new IOException(response.body().toString()); - } else if (!response.isSuccessful()) { - throw new IOException("Error sending push subscription to Iterable: " + response.body().toString()); - } + handleIterableResponse(response); } void updateUser(EventProcessingRequest request) throws IOException { @@ -203,12 +195,7 @@ void updateUser(EventProcessingRequest request) throws IOException { //this is safe due to the filters above updateEmailRequest.newEmail = changeEvent.getAdded().get(0).getValue(); Response response = iterableService.updateEmail(getApiKey(request), updateEmailRequest).execute(); - if (response.isSuccessful()) { - IterableApiResponse apiResponse = response.body(); - if (apiResponse != null && !apiResponse.isSuccess()) { - throw new IOException("Error while calling updateEmail() on iterable: HTTP " + apiResponse.code); - } - } + handleIterableResponse(response); } //convert from old to new email @@ -218,12 +205,7 @@ void updateUser(EventProcessingRequest request) throws IOException { updateEmailRequest.currentEmail = changeEvent.getRemoved().get(0).getValue(); updateEmailRequest.newEmail = changeEvent.getAdded().get(0).getValue(); Response response = iterableService.updateEmail(getApiKey(request), updateEmailRequest).execute(); - if (response.isSuccessful()) { - IterableApiResponse apiResponse = response.body(); - if (apiResponse != null && !apiResponse.isSuccess()) { - throw new IOException("Error while calling updateEmail() on iterable: HTTP " + apiResponse.code); - } - } + handleIterableResponse(response); } } @@ -233,12 +215,7 @@ void updateUser(EventProcessingRequest request) throws IOException { if (!isEmpty(userUpdateRequest.email) || !isEmpty(userUpdateRequest.userId)) { userUpdateRequest.dataFields = convertAttributes(request.getUserAttributes(), shouldCoerceStrings(request)); Response response = iterableService.userUpdate(getApiKey(request), userUpdateRequest).execute(); - if (response.isSuccessful()) { - IterableApiResponse apiResponse = response.body(); - if (apiResponse != null && !apiResponse.isSuccess()) { - throw new IOException("Error while calling updateUser() on iterable: HTTP " + apiResponse.code); - } - } + handleIterableResponse(response); } } } @@ -280,6 +257,9 @@ private static boolean isEmpty(CharSequence chars) { public void processProductActionEvent(ProductActionEvent event) throws IOException { if (event.getAction().equals(ProductActionEvent.Action.PURCHASE)) { TrackPurchaseRequest purchaseRequest = new TrackPurchaseRequest(); + if (event.getId() != null) { + purchaseRequest.id = event.getId().toString(); + } purchaseRequest.createdAt = (int) (event.getTimestamp() / 1000.0); ApiUser apiUser = new ApiUser(); addUserIdentitiesToRequest(apiUser, event.getRequest()); @@ -294,11 +274,7 @@ public void processProductActionEvent(ProductActionEvent event) throws IOExcepti } Response response = iterableService.trackPurchase(getApiKey(event), purchaseRequest).execute(); - if (response.isSuccessful() && !response.body().isSuccess()) { - throw new IOException(response.body().toString()); - } else if (!response.isSuccessful()) { - throw new IOException("Error sending custom event to Iterable: HTTP " + response.code()); - } + handleIterableResponse(response); } } @@ -495,6 +471,9 @@ public ModuleRegistrationResponse processRegistrationRequest(ModuleRegistrationR Event.Type.USER_IDENTITY_CHANGE, Event.Type.PRODUCT_ACTION); + // Specify an unlimited maximum data age (primarily for Event Replays) + eventProcessingRegistration.setMaxDataAgeHours(-1); + eventProcessingRegistration.setSupportedEventTypes(supportedEventTypes); response.setEventProcessingRegistration(eventProcessingRegistration); AudienceProcessingRegistration audienceRegistration = new AudienceProcessingRegistration(); @@ -552,11 +531,7 @@ private boolean processSubscribeEvent(CustomEvent event) throws IOException { return false; } Response response = iterableService.updateSubscriptions(getApiKey(event), updateRequest).execute(); - if (response.isSuccessful() && !response.body().isSuccess()) { - throw new IOException(response.body().toString()); - } else if (!response.isSuccessful()) { - throw new IOException("Error sending update subscriptions event to Iterable: HTTP " + response.code()); - } + handleIterableResponse(response); return true; } @@ -608,16 +583,13 @@ public void processCustomEvent(CustomEvent event) throws IOException { } TrackRequest request = new TrackRequest(event.getName()); + request.id = event.getId().toString(); request.createdAt = (int) (event.getTimestamp() / 1000.0); request.dataFields = attemptTypeConversion(event.getAttributes()); addUserIdentitiesToRequest(request, event.getRequest()); Response response = iterableService.track(getApiKey(event), request).execute(); - if (response.isSuccessful() && !response.body().isSuccess()) { - throw new IOException(response.body().toString()); - } else if (!response.isSuccessful()) { - throw new IOException("Error sending custom event to Iterable: HTTP " + response.code()); - } + handleIterableResponse(response); } /** @@ -687,11 +659,7 @@ public void processPushMessageReceiptEvent(PushMessageReceiptEvent event) throws } request.createdAt = (int) (event.getTimestamp() / 1000.0); Response response = iterableService.trackPushOpen(getApiKey(event), request).execute(); - if (response.isSuccessful() && !response.body().isSuccess()) { - throw new IOException(response.body().toString()); - } else if (!response.isSuccessful()) { - throw new IOException("Error sending push-open to Iterable: HTTP " + response.code()); - } + handleIterableResponse(response); } } } @@ -805,4 +773,16 @@ private boolean hasBundledSDK(EventProcessingRequest processingRequest) { integrationAttributes.getOrDefault("Iterable.sdkVersion", null) != null; } + static void handleIterableResponse(Response iterableResponse) throws IOException { + if (iterableResponse.isSuccessful() && !iterableResponse.body().isSuccess()) { + throw new IOException(iterableResponse.body().toString()); + } else if (!iterableResponse.isSuccessful()) { + if (iterableResponse.code() == 429) { + throw new TooManyRequestsException(); + } else { + throw new IOException("Error sending custom event to Iterable: HTTP " + iterableResponse.code()); + } + } + } + } diff --git a/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableLambdaEndpoint.java b/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableLambdaEndpoint.java index fd4220b..95ade72 100644 --- a/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableLambdaEndpoint.java +++ b/iterable-extension/src/main/java/com/mparticle/ext/iterable/IterableLambdaEndpoint.java @@ -1,7 +1,6 @@ package com.mparticle.ext.iterable; import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.mparticle.sdk.model.Message; import com.mparticle.sdk.model.MessageSerializer; @@ -9,6 +8,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; public class IterableLambdaEndpoint implements RequestStreamHandler { @@ -19,7 +20,21 @@ public class IterableLambdaEndpoint implements RequestStreamHandler { @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { Message request = serializer.deserialize(input, Message.class); - Message response = processor.processMessage(request); - serializer.serialize(output, response); + + try { + Message message = processor.processMessage(request); + SuccessResponse success = new SuccessResponse(200, message); + serializer.serialize(output, success); + } catch (TooManyRequestsException e) { + Map body = new HashMap<>(); + body.put("message", "Iterable rate limit exceeded"); + ErrorResponse error = new ErrorResponse(429, body); + serializer.serialize(output, error); + } catch (IOException e) { + Map body = new HashMap<>(); + body.put("message", e.getMessage()); + ErrorResponse error = new ErrorResponse(500, body); + serializer.serialize(output, error); + } } } \ No newline at end of file diff --git a/iterable-extension/src/main/java/com/mparticle/ext/iterable/SuccessResponse.java b/iterable-extension/src/main/java/com/mparticle/ext/iterable/SuccessResponse.java new file mode 100644 index 0000000..486c972 --- /dev/null +++ b/iterable-extension/src/main/java/com/mparticle/ext/iterable/SuccessResponse.java @@ -0,0 +1,35 @@ +package com.mparticle.ext.iterable; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.mparticle.sdk.model.Message; + +public class SuccessResponse { + @JsonProperty(value="statusCode", required=true) + public int statusCode; + + @JsonProperty(value="body", required=true) + public Message body; + + public SuccessResponse() {} + + public SuccessResponse(int statusCode, Message body) { + this.statusCode = statusCode; + this.body = body; + } + + public int getStatusCode() { + return statusCode; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public Message getBody() { + return body; + } + + public void setBody(Message body) { + this.body = body; + } +} diff --git a/iterable-extension/src/main/java/com/mparticle/ext/iterable/TooManyRequestsException.java b/iterable-extension/src/main/java/com/mparticle/ext/iterable/TooManyRequestsException.java new file mode 100644 index 0000000..3486bcf --- /dev/null +++ b/iterable-extension/src/main/java/com/mparticle/ext/iterable/TooManyRequestsException.java @@ -0,0 +1,10 @@ +package com.mparticle.ext.iterable; + +import java.io.IOException; + +public class TooManyRequestsException extends IOException { + + public TooManyRequestsException() { + super(); + } +} diff --git a/iterable-extension/src/test/java/com/mparticle/ext/iterable/IterableExtensionTest.java b/iterable-extension/src/test/java/com/mparticle/ext/iterable/IterableExtensionTest.java index 463cc8b..100fc7a 100644 --- a/iterable-extension/src/test/java/com/mparticle/ext/iterable/IterableExtensionTest.java +++ b/iterable-extension/src/test/java/com/mparticle/ext/iterable/IterableExtensionTest.java @@ -9,6 +9,7 @@ import com.mparticle.sdk.model.registration.ModuleRegistrationResponse; import com.mparticle.sdk.model.registration.Setting; import com.mparticle.sdk.model.registration.UserIdentityPermission; +import okhttp3.ResponseBody; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -248,6 +249,7 @@ public void testProcessCustomEvent() throws Exception { assertEquals("123456", argument.getValue().userId); assertEquals("some attribute value", argument.getValue().dataFields.get("some attribute key")); assertEquals((int) (timeStamp / 1000.0), argument.getValue().createdAt + 0); + assertNotNull(argument.getValue().id); apiResponse.code = "anything but success"; @@ -623,6 +625,7 @@ public void testProcessProductActionEvent() throws Exception { assertEquals(trackPurchaseRequest.user.userId, "123456"); assertEquals(trackPurchaseRequest.items.size(), 2); assertEquals(trackPurchaseRequest.total, new BigDecimal(101d)); + assertNotNull(trackPurchaseRequest.id); } @Test @@ -790,6 +793,28 @@ public void testUpdateSubscriptionsEvent() throws Exception { assertNotNull("Iterable extension should have thrown an IOException", exception); } + @Test + public void testHandleIterableSuccess() throws IOException{ + IterableApiResponse iterableApiResponse = new IterableApiResponse(); + iterableApiResponse.code = IterableApiResponse.SUCCESS_MESSAGE; + Response r = Response.success(iterableApiResponse); + IterableExtension.handleIterableResponse(r); + } + + @Test(expected = TooManyRequestsException.class) + public void testHandleIterable429() throws IOException { + IterableApiResponse iterableApiResponse = new IterableApiResponse(); + Response r = Response.error(429, ResponseBody.create(null, "")); + IterableExtension.handleIterableResponse(r); + } + + @Test(expected = IOException.class) + public void testHandleIterableError() throws IOException { + IterableApiResponse iterableApiResponse = new IterableApiResponse(); + Response r = Response.error(500, ResponseBody.create(null, "")); + IterableExtension.handleIterableResponse(r); + } + private EventProcessingRequest createEventProcessingRequest() { EventProcessingRequest request = new EventProcessingRequest(); Account account = new Account(); diff --git a/iterable-extension/src/test/java/com/mparticle/ext/iterable/IterableLambdaEndpointTest.java b/iterable-extension/src/test/java/com/mparticle/ext/iterable/IterableLambdaEndpointTest.java new file mode 100644 index 0000000..82c6d9f --- /dev/null +++ b/iterable-extension/src/test/java/com/mparticle/ext/iterable/IterableLambdaEndpointTest.java @@ -0,0 +1,96 @@ +package com.mparticle.ext.iterable; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mparticle.iterable.IterableApiResponse; +import com.mparticle.iterable.IterableService; +import okhttp3.ResponseBody; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import retrofit2.Call; +import retrofit2.Response; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class IterableLambdaEndpointTest { + + private static final String TEST_INPUT = "{\r\n\t\"type\": \"event_processing_request\",\r\n\t\"id\": \"9365387f-9638-4fb2-88a9-36db9be1be5d\",\r\n\t\"timestamp_ms\": 1591328642162,\r\n\t\"firehose_version\": \"2.2.0\",\r\n\t\"account\": {\r\n\t\t\"account_id\": 0,\r\n\t\t\"account_settings\": {\r\n\t\t\t\"apiKey\": \"foo\",\r\n\t\t\t\"userIdField\": \"customerId\"\r\n\t\t}\r\n\t},\r\n\t\"user_identities\": [{\r\n\t\t\"type\": \"email\",\r\n\t\t\"encoding\": \"raw\",\r\n\t\t\"value\": \"foo@placeholder.email\"\r\n\t}],\r\n\t\"events\": [{\r\n\t\t\"type\": \"user_attribute_change\",\r\n\t\t\"id\": \"a0ab9fd1-f5b2-4119-b08a-dab1669b852d\",\r\n\t\t\"timestamp_ms\": 2\r\n\t}, {\r\n\t\t\"type\": \"user_attribute_change\",\r\n\t\t\"id\": \"ba66bb9f-1770-4006-8072-3351a7449be4\",\r\n\t\t\"timestamp_ms\": 3\r\n\t}],\r\n\t\"device_application_stamp\": \"foo\",\r\n\t\"mpid\": \"1234567890\"\r\n}"; + + private IterableExtension extension; + private IterableLambdaEndpoint lambda; + + @Before + public void setup() throws IOException { + extension = new IterableExtension(); + lambda = new IterableLambdaEndpoint(); + } + + @Test + public void testSuccessResponse() throws IOException { + // Mock successful calls to Iterable + lambda.processor.iterableService = Mockito.mock(IterableService.class); + Call callMock = Mockito.mock(Call.class); + Mockito.when(lambda.processor.iterableService.userUpdate(Mockito.any(), Mockito.any())) + .thenReturn(callMock); + IterableApiResponse apiResponse = new IterableApiResponse(); + apiResponse.code = IterableApiResponse.SUCCESS_MESSAGE; + Response iterableResponse = Response.success(apiResponse); + Mockito.when(callMock.execute()).thenReturn(iterableResponse); + + InputStream input = new ByteArrayInputStream(TEST_INPUT.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + lambda.handleRequest(input, output, null); + ObjectMapper mapper = new ObjectMapper(); + SuccessResponse lambdaResponse = mapper.readValue(new String(output.toByteArray()), SuccessResponse.class); + assertEquals(200, lambdaResponse.getStatusCode()); + assertNotNull(lambdaResponse.getBody().getFirehoseVersion()); + } + + @Test + public void testTooManyRequestsResponse() throws IOException { + // Mock 429 response from Iterable + lambda.processor.iterableService = Mockito.mock(IterableService.class); + Call callMock = Mockito.mock(Call.class); + Mockito.when(lambda.processor.iterableService.userUpdate(Mockito.any(), Mockito.any())) + .thenReturn(callMock); + Response iterableResponse = Response.error(429, ResponseBody.create(null, "")); + Mockito.when(callMock.execute()).thenReturn(iterableResponse); + + InputStream input = new ByteArrayInputStream(TEST_INPUT.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + lambda.handleRequest(input, output, null); + + ObjectMapper mapper = new ObjectMapper(); + ErrorResponse lambdaResponse = mapper.readValue(new String(output.toByteArray()), ErrorResponse.class); + assertEquals(429, lambdaResponse.getStatusCode()); + assertNotNull(lambdaResponse.getBody()); + } + + @Test + public void testGenericErrorResponse() throws IOException { + // Mock non-429 error response from Iterable + lambda.processor.iterableService = Mockito.mock(IterableService.class); + Call callMock = Mockito.mock(Call.class); + Mockito.when(lambda.processor.iterableService.userUpdate(Mockito.any(), Mockito.any())) + .thenReturn(callMock); + Response iterableResponse = Response.error(400, ResponseBody.create(null, "")); + Mockito.when(callMock.execute()).thenReturn(iterableResponse); + + InputStream input = new ByteArrayInputStream(TEST_INPUT.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + lambda.handleRequest(input, output, null); + + ObjectMapper mapper = new ObjectMapper(); + ErrorResponse lambdaResponse = mapper.readValue(new String(output.toByteArray()), ErrorResponse.class); + assertEquals(500, lambdaResponse.getStatusCode()); + assertNotNull(lambdaResponse.getBody()); + } +} diff --git a/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackPurchaseRequest.java b/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackPurchaseRequest.java index fdab40b..84f715f 100644 --- a/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackPurchaseRequest.java +++ b/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackPurchaseRequest.java @@ -6,6 +6,11 @@ import java.util.Map; public class TrackPurchaseRequest { + /** + * Optional purchase id. If a purchase already exists with this id, the event will be updated. If none + * is specified, a new id will automatically be generated and returned. + */ + public String id; public ApiUser user; public List items; public Integer campaignId; diff --git a/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackRequest.java b/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackRequest.java index 96ed185..97ed5ba 100644 --- a/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackRequest.java +++ b/iterable-java-sdk/src/main/java/com/mparticle/iterable/TrackRequest.java @@ -18,6 +18,11 @@ public class TrackRequest extends UserRequest { * Additional data associated with event (i.e. item id, item amount), */ public Map dataFields; + /** + * Optional event id. If an event already exists with this id, the event will be updated. If none is specified, + * a new id will automatically be generated and returned. + */ + public String id; /** * Campaign tied to conversion */