Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 30 additions & 33 deletions src/main/java/com/hubspot/smtp/client/SmtpSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class SmtpSession {
private volatile boolean requiresRset = false;
private volatile EhloResponse ehloResponse = EhloResponse.EMPTY;

private List<String> localExtensionsList;

SmtpSession(Channel channel, ResponseHandler responseHandler, SmtpSessionConfig config, Executor executor, Supplier<SSLEngine> sslEngineSupplier) {
this.channel = channel;
this.responseHandler = responseHandler;
Expand Down Expand Up @@ -238,6 +240,18 @@ public CompletableFuture<SmtpClientResponse> send(String from, Collection<String
return send(from, recipients, content, Optional.empty());
}

/**
* Sends an email as efficiently as possible, using the extended SMTP features supported by the remote server.
*
* This method behaves as {@link SmtpSession#send(String, Collection, MessageContent, SendInterceptor)} but
* does not use a {@link SendInterceptor} and has the capability to accept local mail extensions that can be sent along
* with mail from command according to RFC 5321. All these local extensions must start with X.
*/
public CompletableFuture<SmtpClientResponse> send(String from, Collection<String> recipients, MessageContent content, List<String> localExtensionsList) {
this.localExtensionsList = localExtensionsList;
return send(from, recipients, content, Optional.empty());
}

/**
* Sends an email as efficiently as possible, using the extended SMTP features supported by the remote server.
*
Expand Down Expand Up @@ -306,20 +320,11 @@ private CompletableFuture<SmtpClientResponse> sendInternal(String from, Collecti
return sendAsChunked(from, recipients, content, sequenceInterceptor);
}

if (content.getEncoding() == MessageContentEncoding.SEVEN_BIT) {
return sendAs7Bit(from, recipients, content, sequenceInterceptor);
}

if (ehloResponse.isSupported(Extension.EIGHT_BIT_MIME)) {
return sendAs8BitMime(from, recipients, content, sequenceInterceptor);
if (content.getEncoding() != MessageContentEncoding.SEVEN_BIT && !ehloResponse.isSupported(Extension.EIGHT_BIT_MIME)) {
content = encodeContentAs7Bit(content);
}

if (content.get8bitCharacterProportion() == 0) {
return sendAs7Bit(from, recipients, content, sequenceInterceptor);
}

// this message is not 7 bit, but the server only supports 7 bit :(
return sendAs7Bit(from, recipients, encodeContentAs7Bit(content), sequenceInterceptor);
return sendMessage(from, recipients, content, sequenceInterceptor);
}

private CompletableFuture<SmtpClientResponse> sendAsChunked(String from, Collection<String> recipients, MessageContent content, Optional<SendInterceptor> sequenceInterceptor) {
Expand Down Expand Up @@ -379,16 +384,10 @@ public Object next() {
};
}

private CompletableFuture<SmtpClientResponse> sendAs7Bit(String from, Collection<String> recipients, MessageContent content, Optional<SendInterceptor> sequenceInterceptor) {
private CompletableFuture<SmtpClientResponse> sendMessage(String from, Collection<String> recipients, MessageContent content, Optional<SendInterceptor> sequenceInterceptor) {
return sendPipelinedIfPossible(mailCommand(from, recipients), recipients, SmtpRequests.data(), sequenceInterceptor)
.thenSend(content.getDotStuffedContent(), DotCrlfBuffer.get())
.toResponses();
}

private CompletableFuture<SmtpClientResponse> sendAs8BitMime(String from, Collection<String> recipients, MessageContent content, Optional<SendInterceptor> sequenceInterceptor) {
return sendPipelinedIfPossible(mailCommandWith8BitMime(from, recipients), recipients, SmtpRequests.data(), sequenceInterceptor)
.thenSend(content.getDotStuffedContent(), DotCrlfBuffer.get())
.toResponses();
.thenSend(content.getDotStuffedContent(), DotCrlfBuffer.get())
.toResponses();
}

private SendSequence sendPipelinedIfPossible(SmtpRequest mailRequest, Collection<String> recipients, SmtpRequest dataRequest, Optional<SendInterceptor> sequenceInterceptor) {
Expand All @@ -414,20 +413,18 @@ private Collection<SmtpRequest> rpctCommands(Collection<String> recipients) {
return recipients.stream().map(SmtpRequests::rcpt).collect(Collectors.toList());
}

private SmtpRequest mailCommand(String from, Collection<String> recipients) {
if (!ehloResponse.isSupported(Extension.SMTPUTF8) || (isAllAscii(from) && isAllAscii(recipients))) {
return SmtpRequests.mail(from);
} else {
return SmtpRequests.mail(from, "SMTPUTF8");
private SmtpRequest mailCommand(String from, Collection<String> recipients){
if(localExtensionsList == null){
localExtensionsList = new ArrayList<>();
}
}

private SmtpRequest mailCommandWith8BitMime(String from, Collection<String> recipients) {
if (!ehloResponse.isSupported(Extension.SMTPUTF8) || (isAllAscii(from) && isAllAscii(recipients))) {
return SmtpRequests.mail(from, "BODY=8BITMIME");
} else {
return SmtpRequests.mail(from, "BODY=8BITMIME", "SMTPUTF8");
if (ehloResponse.isSupported(Extension.EIGHT_BIT_MIME)) {
localExtensionsList.add("BODY=8BITMIME");
}
if (ehloResponse.isSupported(Extension.SMTPUTF8) && !(isAllAscii(from) && isAllAscii(recipients))) {
localExtensionsList.add("SMTPUTF8");
}

return SmtpRequests.mail(from, localExtensionsList.toArray(new String[0]));
}

private static boolean isAllAscii(String s) {
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/com/hubspot/smtp/IntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -364,6 +365,41 @@ public void itCanSendMultipleEmailsAtOnce() throws Exception {
assertThat(receivedMails.size()).isEqualTo(200);
}

@Test
public void itCanSendAnEmailWithLocalExtensions() throws Exception {

List<String> localExtensionsList = new ArrayList<>();
localExtensionsList.add("XCustomExtension1=SomeValue1");
localExtensionsList.add("XCustomExtension2=SomeValue2");

List<CompletableFuture<Void>> futures = Lists.newArrayList();

Executor executor = Executors.newFixedThreadPool(50);
SmtpSessionFactory factory = new SmtpSessionFactory(SmtpSessionFactoryConfig.nonProductionConfig().withExecutor(executor));

// Avoiding chunking because it hangs with chunking
SmtpSessionConfig pipeliningConfig = getDefaultConfig().withDisabledExtensions(EnumSet.of(Extension.CHUNKING));

factory.connect(pipeliningConfig)
.thenCompose(r -> assertSuccess(r).send(req(EHLO, "hubspot.com")))
.thenCompose(r -> assertSuccess(r).send(
RETURN_PATH,
Lists.newArrayList("a@example.com", "b@example.com"),
createMessageContent(),
localExtensionsList))
.thenCompose(r -> assertSuccess(r).send(req(QUIT)))
.thenCompose(r -> assertSuccess(r).close())
.get();

assertThat(receivedMails.size()).isEqualTo(1);
MailEnvelope mail = receivedMails.get(0);

assertThat(mail.getSender().toString()).isEqualTo(RETURN_PATH);
assertThat(mail.getRecipients().get(0).toString()).isEqualTo("a@example.com");
assertThat(mail.getRecipients().get(1).toString()).isEqualTo("b@example.com");
assertThat(readContents(mail)).contains(MESSAGE_DATA);
}

@Test
public void itSendsKeepAliveCommands() throws Exception {
connect(SmtpSessionConfig.forRemoteAddress(serverAddress).withKeepAliveTimeout(Duration.ofSeconds(1)))
Expand Down