From 710922622c67b47d19960532e36f8d9d9af0ecf5 Mon Sep 17 00:00:00 2001 From: Charles Gould Date: Sun, 10 Feb 2019 22:21:53 -0500 Subject: [PATCH] Initialize STOMP heartbeats in JavaFX client --- client/src/main/config/application.yaml | 5 +++ .../lingo/client/bootstrap/LingoClient.java | 2 +- .../client/bootstrap/LingoPresenter.java | 2 + .../client/multiplayer/MultiplayerConfig.java | 37 +++++++++++++++--- .../multiplayer/MultiplayerPresenter.java | 39 ++++++++++--------- .../client/multiplayer/StompTemplate.java | 33 +++++++++++----- server/pom.xml | 6 ++- server/src/main/config/application.yaml | 6 +++ 8 files changed, 94 insertions(+), 36 deletions(-) diff --git a/client/src/main/config/application.yaml b/client/src/main/config/application.yaml index 309b2fb..d58b013 100644 --- a/client/src/main/config/application.yaml +++ b/client/src/main/config/application.yaml @@ -7,3 +7,8 @@ web.socket.url: ws://localhost:8080/sockjs # Production #web.base.url: http://lingo.charego.com #web.socket.url: ws://lingo.charego.com/sockjs + +# Logging +logging: + level: + lingo: DEBUG diff --git a/client/src/main/java/lingo/client/bootstrap/LingoClient.java b/client/src/main/java/lingo/client/bootstrap/LingoClient.java index 4abc24c..3e72965 100644 --- a/client/src/main/java/lingo/client/bootstrap/LingoClient.java +++ b/client/src/main/java/lingo/client/bootstrap/LingoClient.java @@ -60,7 +60,7 @@ public class LingoClient extends Application { @Bean public ExecutorService executorService() { - return Executors.newFixedThreadPool(5, new CustomizableThreadFactory("ClientThread-")); + return Executors.newSingleThreadExecutor(new CustomizableThreadFactory("ClientThread-")); } } diff --git a/client/src/main/java/lingo/client/bootstrap/LingoPresenter.java b/client/src/main/java/lingo/client/bootstrap/LingoPresenter.java index 538af79..30dad88 100644 --- a/client/src/main/java/lingo/client/bootstrap/LingoPresenter.java +++ b/client/src/main/java/lingo/client/bootstrap/LingoPresenter.java @@ -65,6 +65,7 @@ public class LingoPresenter implements FxmlController { MultiplayerPresenter presenter = multiplayerContext.getBean(MultiplayerPresenter.class); presenter.setOnBackButtonPressed(e -> { + log.info("Closing multiplayer..."); multiplayerContext.close(); content.setCenter(gameModeChooser); }); @@ -90,6 +91,7 @@ public class LingoPresenter implements FxmlController { Set guesses = WordReader.readFileToSet("/guesses.txt"); List words = WordReader.readFileToList("/words.txt"); SinglePlayerPresenter presenter = new SinglePlayerPresenter(words, guesses, e -> { + log.info("Closing single player..."); content.setCenter(gameModeChooser); }); content.setCenter(presenter.getNode()); diff --git a/client/src/main/java/lingo/client/multiplayer/MultiplayerConfig.java b/client/src/main/java/lingo/client/multiplayer/MultiplayerConfig.java index f0ecc29..ee1503e 100644 --- a/client/src/main/java/lingo/client/multiplayer/MultiplayerConfig.java +++ b/client/src/main/java/lingo/client/multiplayer/MultiplayerConfig.java @@ -7,16 +7,20 @@ import org.springframework.boot.web.client.RootUriTemplateHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; +import org.springframework.core.task.TaskExecutor; import org.springframework.messaging.converter.ByteArrayMessageConverter; import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.StringMessageConverter; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.client.RestTemplate; import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient; +import org.springframework.web.socket.sockjs.client.RestTemplateXhrTransport; import org.springframework.web.socket.sockjs.client.SockJsClient; import org.springframework.web.socket.sockjs.client.Transport; import org.springframework.web.socket.sockjs.client.WebSocketTransport; @@ -24,20 +28,30 @@ import org.springframework.web.socket.sockjs.client.WebSocketTransport; @Configuration public class MultiplayerConfig { + @Bean + public TaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); + executor.setMaxPoolSize(4); + executor.setThreadNamePrefix("TaskThread-"); + executor.initialize(); + return executor; + } + @Bean public WebSocketClient webSocketClient() { - WebSocketClient webSocketClient = new StandardWebSocketClient(); List transports = new ArrayList<>(); - transports.add(new WebSocketTransport(webSocketClient)); - SockJsClient sockJsClient = new SockJsClient(transports); - return sockJsClient; + transports.add(new WebSocketTransport(new StandardWebSocketClient())); + transports.add(new RestTemplateXhrTransport()); + return new SockJsClient(transports); } @Bean public WebSocketStompClient stompClient(WebSocketClient webSocketClient, MessageConverter messageConverter) { WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient); stompClient.setMessageConverter(messageConverter); - stompClient.setTaskScheduler(new ThreadPoolTaskScheduler()); + stompClient.setTaskScheduler(heartbeatScheduler()); + stompClient.setDefaultHeartbeat(heartbeatValue()); return stompClient; } @@ -57,4 +71,17 @@ public class MultiplayerConfig { return restTemplate; } + private TaskScheduler heartbeatScheduler() { + // Single thread unless/until more threads are needed + final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.setPoolSize(1); + taskScheduler.setThreadNamePrefix("Keep-Alive-"); + taskScheduler.initialize(); + return taskScheduler; + } + + private long[] heartbeatValue() { + return new long[] { 25000, 25000 }; + } + } diff --git a/client/src/main/java/lingo/client/multiplayer/MultiplayerPresenter.java b/client/src/main/java/lingo/client/multiplayer/MultiplayerPresenter.java index 55b2cc8..4648c11 100644 --- a/client/src/main/java/lingo/client/multiplayer/MultiplayerPresenter.java +++ b/client/src/main/java/lingo/client/multiplayer/MultiplayerPresenter.java @@ -5,7 +5,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import javax.annotation.PostConstruct; @@ -13,9 +13,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.task.TaskExecutor; import org.springframework.http.HttpMethod; import org.springframework.messaging.simp.stomp.StompFrameHandler; import org.springframework.messaging.simp.stomp.StompHeaders; +import org.springframework.messaging.simp.stomp.StompSession.Subscription; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; @@ -63,7 +65,7 @@ public class MultiplayerPresenter implements FxmlController { private WebView webView; @Autowired - private ExecutorService executorService; + private TaskExecutor taskExecutor; @Autowired private RestTemplate restTemplate; @@ -128,7 +130,7 @@ public class MultiplayerPresenter implements FxmlController { backButton.setOnAction(backButtonHandler); - executorService.execute(() -> { + taskExecutor.execute(() -> { while (subscriptionsLatch.getCount() != 0) { try { subscriptionsLatch.await(); @@ -144,12 +146,14 @@ public class MultiplayerPresenter implements FxmlController { boolean joinedGame = false; for (Game game : games) { if (game.getPlayerTwo() == null) { + log.debug("Joining game..."); stompTemplate.getSession().send("/app/joinGame", game.getId()); joinedGame = true; break; } } if (!joinedGame) { + log.debug("Hosting game..."); stompTemplate.getSession().send("/app/hostGame", null); } }); @@ -165,7 +169,7 @@ public class MultiplayerPresenter implements FxmlController { } else if (keyCode == KeyCode.ENTER) { final String guess = playerBoard.handleEnter(); if (guess != null) { - executorService.execute(() -> stompTemplate.getSession().send("/app/guess", guess)); + taskExecutor.execute(() -> stompTemplate.getSession().send("/app/guess", guess)); repaint(); } } else if (keyCode.isLetterKey()) { @@ -181,21 +185,18 @@ public class MultiplayerPresenter implements FxmlController { @PostConstruct private void postConstruct() { - executorService.execute(() -> { - stompTemplate.subscribe(Destinations.GAME_CLOSED, new GameClosedHandler(), - subscription -> subscriptionsLatch.countDown()); - stompTemplate.subscribe(Destinations.GAME_HOSTED, new GameHostedHandler(), - subscription -> subscriptionsLatch.countDown()); - stompTemplate.subscribe(Destinations.GAME_JOINED, new GameJoinedHandler(), - subscription -> subscriptionsLatch.countDown()); - stompTemplate.subscribe(Destinations.GAME_LEFT, new GameLeftHandler(), - subscription -> subscriptionsLatch.countDown()); - stompTemplate.subscribe("/user" + Destinations.OPPONENT_JOINED, new OpponentJoinedHandler(), - subscription -> subscriptionsLatch.countDown()); - stompTemplate.subscribe("/user" + Destinations.OPPONENT_REPORTS, new OpponentReportHandler(), - subscription -> subscriptionsLatch.countDown()); - stompTemplate.subscribe("/user" + Destinations.PLAYER_REPORTS, new PlayerReportHandler(), - subscription -> subscriptionsLatch.countDown()); + Consumer defaultCallback = subscription -> { + subscriptionsLatch.countDown(); + log.debug("Subscription received: {}", subscription.getSubscriptionHeaders()); + }; + taskExecutor.execute(() -> { + stompTemplate.subscribe(Destinations.GAME_CLOSED, new GameClosedHandler(), defaultCallback); + stompTemplate.subscribe(Destinations.GAME_HOSTED, new GameHostedHandler(), defaultCallback); + stompTemplate.subscribe(Destinations.GAME_JOINED, new GameJoinedHandler(), defaultCallback); + stompTemplate.subscribe(Destinations.GAME_LEFT, new GameLeftHandler(), defaultCallback); + stompTemplate.subscribe("/user" + Destinations.OPPONENT_JOINED, new OpponentJoinedHandler(), defaultCallback); + stompTemplate.subscribe("/user" + Destinations.OPPONENT_REPORTS, new OpponentReportHandler(), defaultCallback); + stompTemplate.subscribe("/user" + Destinations.PLAYER_REPORTS, new PlayerReportHandler(), defaultCallback); }); } diff --git a/client/src/main/java/lingo/client/multiplayer/StompTemplate.java b/client/src/main/java/lingo/client/multiplayer/StompTemplate.java index 57821ff..5c28577 100644 --- a/client/src/main/java/lingo/client/multiplayer/StompTemplate.java +++ b/client/src/main/java/lingo/client/multiplayer/StompTemplate.java @@ -1,7 +1,6 @@ package lingo.client.multiplayer; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; @@ -12,6 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.task.TaskExecutor; +import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompFrameHandler; import org.springframework.messaging.simp.stomp.StompHeaders; import org.springframework.messaging.simp.stomp.StompSession; @@ -29,7 +30,7 @@ public class StompTemplate { private String webSocketUrl; @Autowired - private ExecutorService executorService; + private TaskExecutor taskExecutor; @Autowired private WebSocketStompClient stompClient; @@ -48,8 +49,9 @@ public class StompTemplate { @PostConstruct private void postConstruct() { - executorService.execute(() -> stompClient.connect(webSocketUrl, new WebSocketSessionHandler())); - new Thread(new WebSocketSessionListener()).start(); + log.info("Connecting to STOMP endpoint: " + webSocketUrl); + taskExecutor.execute(() -> stompClient.connect(webSocketUrl, new WebSocketSessionHandler())); + taskExecutor.execute(new WebSocketSessionListener()); } @PreDestroy @@ -74,17 +76,17 @@ public class StompTemplate { } private class SubscriptionRequest { - public final String destination; - public final StompFrameHandler handler; - public final Consumer callback; + final String destination; + final StompFrameHandler handler; + final Consumer callback; - public SubscriptionRequest(String destination, StompFrameHandler handler, Consumer callback) { + SubscriptionRequest(String destination, StompFrameHandler handler, Consumer callback) { this.destination = destination; this.handler = handler; this.callback = callback; } - public void onSubscribed(Subscription subscription) { + void onSubscribed(Subscription subscription) { if (callback != null) { callback.accept(subscription); } @@ -97,6 +99,18 @@ public class StompTemplate { log.info("Connected to STOMP endpoint"); stompSession = session; } + + @Override + public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) { + log.error("STOMP session exception", exception); + super.handleException(session, command, headers, payload, exception); + } + + @Override + public void handleTransportError(StompSession session, Throwable exception) { + log.error("STOMP session transport error", exception); + super.handleTransportError(session, exception); + } } private class WebSocketSessionListener implements Runnable { @@ -114,6 +128,7 @@ public class StompTemplate { } try { final SubscriptionRequest request = subscriptionRequests.take(); + log.debug("Subscribing to destination: {}", request.destination); final Subscription subscription = stompSession.subscribe(request.destination, request.handler); request.onSubscribed(subscription); } catch (InterruptedException e) { diff --git a/server/pom.xml b/server/pom.xml index 47c6170..a366bf4 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -43,8 +43,10 @@ true - - src/main/config + + + ${project.basedir}/src/main/config + diff --git a/server/src/main/config/application.yaml b/server/src/main/config/application.yaml index a233791..9fd1b5f 100644 --- a/server/src/main/config/application.yaml +++ b/server/src/main/config/application.yaml @@ -3,3 +3,9 @@ server: address: localhost port: 8080 + +# Logging +logging: + level: + lingo: DEBUG + web: DEBUG