Initialize STOMP heartbeats in JavaFX client

This commit is contained in:
Charles Gould 2019-02-10 22:21:53 -05:00
parent 8392543515
commit 710922622c
8 changed files with 94 additions and 36 deletions

View File

@ -7,3 +7,8 @@ web.socket.url: ws://localhost:8080/sockjs
# Production # Production
#web.base.url: http://lingo.charego.com #web.base.url: http://lingo.charego.com
#web.socket.url: ws://lingo.charego.com/sockjs #web.socket.url: ws://lingo.charego.com/sockjs
# Logging
logging:
level:
lingo: DEBUG

View File

@ -60,7 +60,7 @@ public class LingoClient extends Application {
@Bean @Bean
public ExecutorService executorService() { public ExecutorService executorService() {
return Executors.newFixedThreadPool(5, new CustomizableThreadFactory("ClientThread-")); return Executors.newSingleThreadExecutor(new CustomizableThreadFactory("ClientThread-"));
} }
} }

View File

@ -65,6 +65,7 @@ public class LingoPresenter implements FxmlController {
MultiplayerPresenter presenter = multiplayerContext.getBean(MultiplayerPresenter.class); MultiplayerPresenter presenter = multiplayerContext.getBean(MultiplayerPresenter.class);
presenter.setOnBackButtonPressed(e -> { presenter.setOnBackButtonPressed(e -> {
log.info("Closing multiplayer...");
multiplayerContext.close(); multiplayerContext.close();
content.setCenter(gameModeChooser); content.setCenter(gameModeChooser);
}); });
@ -90,6 +91,7 @@ public class LingoPresenter implements FxmlController {
Set<String> guesses = WordReader.readFileToSet("/guesses.txt"); Set<String> guesses = WordReader.readFileToSet("/guesses.txt");
List<String> words = WordReader.readFileToList("/words.txt"); List<String> words = WordReader.readFileToList("/words.txt");
SinglePlayerPresenter presenter = new SinglePlayerPresenter(words, guesses, e -> { SinglePlayerPresenter presenter = new SinglePlayerPresenter(words, guesses, e -> {
log.info("Closing single player...");
content.setCenter(gameModeChooser); content.setCenter(gameModeChooser);
}); });
content.setCenter(presenter.getNode()); content.setCenter(presenter.getNode());

View File

@ -7,16 +7,20 @@ import org.springframework.boot.web.client.RootUriTemplateHandler;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.converter.ByteArrayMessageConverter; import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient; import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.RestTemplateXhrTransport;
import org.springframework.web.socket.sockjs.client.SockJsClient; import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport; import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport; import org.springframework.web.socket.sockjs.client.WebSocketTransport;
@ -24,20 +28,30 @@ import org.springframework.web.socket.sockjs.client.WebSocketTransport;
@Configuration @Configuration
public class MultiplayerConfig { public class MultiplayerConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("TaskThread-");
executor.initialize();
return executor;
}
@Bean @Bean
public WebSocketClient webSocketClient() { public WebSocketClient webSocketClient() {
WebSocketClient webSocketClient = new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(); List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(webSocketClient)); transports.add(new WebSocketTransport(new StandardWebSocketClient()));
SockJsClient sockJsClient = new SockJsClient(transports); transports.add(new RestTemplateXhrTransport());
return sockJsClient; return new SockJsClient(transports);
} }
@Bean @Bean
public WebSocketStompClient stompClient(WebSocketClient webSocketClient, MessageConverter messageConverter) { public WebSocketStompClient stompClient(WebSocketClient webSocketClient, MessageConverter messageConverter) {
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient); WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(messageConverter); stompClient.setMessageConverter(messageConverter);
stompClient.setTaskScheduler(new ThreadPoolTaskScheduler()); stompClient.setTaskScheduler(heartbeatScheduler());
stompClient.setDefaultHeartbeat(heartbeatValue());
return stompClient; return stompClient;
} }
@ -57,4 +71,17 @@ public class MultiplayerConfig {
return restTemplate; return restTemplate;
} }
private TaskScheduler heartbeatScheduler() {
// Single thread unless/until more threads are needed
final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);
taskScheduler.setThreadNamePrefix("Keep-Alive-");
taskScheduler.initialize();
return taskScheduler;
}
private long[] heartbeatValue() {
return new long[] { 25000, 25000 };
}
} }

View File

@ -5,7 +5,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.function.Consumer;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -13,9 +13,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.messaging.simp.stomp.StompFrameHandler; import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders; import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession.Subscription;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
@ -63,7 +65,7 @@ public class MultiplayerPresenter implements FxmlController {
private WebView webView; private WebView webView;
@Autowired @Autowired
private ExecutorService executorService; private TaskExecutor taskExecutor;
@Autowired @Autowired
private RestTemplate restTemplate; private RestTemplate restTemplate;
@ -128,7 +130,7 @@ public class MultiplayerPresenter implements FxmlController {
backButton.setOnAction(backButtonHandler); backButton.setOnAction(backButtonHandler);
executorService.execute(() -> { taskExecutor.execute(() -> {
while (subscriptionsLatch.getCount() != 0) { while (subscriptionsLatch.getCount() != 0) {
try { try {
subscriptionsLatch.await(); subscriptionsLatch.await();
@ -144,12 +146,14 @@ public class MultiplayerPresenter implements FxmlController {
boolean joinedGame = false; boolean joinedGame = false;
for (Game game : games) { for (Game game : games) {
if (game.getPlayerTwo() == null) { if (game.getPlayerTwo() == null) {
log.debug("Joining game...");
stompTemplate.getSession().send("/app/joinGame", game.getId()); stompTemplate.getSession().send("/app/joinGame", game.getId());
joinedGame = true; joinedGame = true;
break; break;
} }
} }
if (!joinedGame) { if (!joinedGame) {
log.debug("Hosting game...");
stompTemplate.getSession().send("/app/hostGame", null); stompTemplate.getSession().send("/app/hostGame", null);
} }
}); });
@ -165,7 +169,7 @@ public class MultiplayerPresenter implements FxmlController {
} else if (keyCode == KeyCode.ENTER) { } else if (keyCode == KeyCode.ENTER) {
final String guess = playerBoard.handleEnter(); final String guess = playerBoard.handleEnter();
if (guess != null) { if (guess != null) {
executorService.execute(() -> stompTemplate.getSession().send("/app/guess", guess)); taskExecutor.execute(() -> stompTemplate.getSession().send("/app/guess", guess));
repaint(); repaint();
} }
} else if (keyCode.isLetterKey()) { } else if (keyCode.isLetterKey()) {
@ -181,21 +185,18 @@ public class MultiplayerPresenter implements FxmlController {
@PostConstruct @PostConstruct
private void postConstruct() { private void postConstruct() {
executorService.execute(() -> { Consumer<Subscription> defaultCallback = subscription -> {
stompTemplate.subscribe(Destinations.GAME_CLOSED, new GameClosedHandler(), subscriptionsLatch.countDown();
subscription -> subscriptionsLatch.countDown()); log.debug("Subscription received: {}", subscription.getSubscriptionHeaders());
stompTemplate.subscribe(Destinations.GAME_HOSTED, new GameHostedHandler(), };
subscription -> subscriptionsLatch.countDown()); taskExecutor.execute(() -> {
stompTemplate.subscribe(Destinations.GAME_JOINED, new GameJoinedHandler(), stompTemplate.subscribe(Destinations.GAME_CLOSED, new GameClosedHandler(), defaultCallback);
subscription -> subscriptionsLatch.countDown()); stompTemplate.subscribe(Destinations.GAME_HOSTED, new GameHostedHandler(), defaultCallback);
stompTemplate.subscribe(Destinations.GAME_LEFT, new GameLeftHandler(), stompTemplate.subscribe(Destinations.GAME_JOINED, new GameJoinedHandler(), defaultCallback);
subscription -> subscriptionsLatch.countDown()); stompTemplate.subscribe(Destinations.GAME_LEFT, new GameLeftHandler(), defaultCallback);
stompTemplate.subscribe("/user" + Destinations.OPPONENT_JOINED, new OpponentJoinedHandler(), stompTemplate.subscribe("/user" + Destinations.OPPONENT_JOINED, new OpponentJoinedHandler(), defaultCallback);
subscription -> subscriptionsLatch.countDown()); stompTemplate.subscribe("/user" + Destinations.OPPONENT_REPORTS, new OpponentReportHandler(), defaultCallback);
stompTemplate.subscribe("/user" + Destinations.OPPONENT_REPORTS, new OpponentReportHandler(), stompTemplate.subscribe("/user" + Destinations.PLAYER_REPORTS, new PlayerReportHandler(), defaultCallback);
subscription -> subscriptionsLatch.countDown());
stompTemplate.subscribe("/user" + Destinations.PLAYER_REPORTS, new PlayerReportHandler(),
subscription -> subscriptionsLatch.countDown());
}); });
} }

View File

@ -1,7 +1,6 @@
package lingo.client.multiplayer; package lingo.client.multiplayer;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -12,6 +11,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompFrameHandler; import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders; import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession; import org.springframework.messaging.simp.stomp.StompSession;
@ -29,7 +30,7 @@ public class StompTemplate {
private String webSocketUrl; private String webSocketUrl;
@Autowired @Autowired
private ExecutorService executorService; private TaskExecutor taskExecutor;
@Autowired @Autowired
private WebSocketStompClient stompClient; private WebSocketStompClient stompClient;
@ -48,8 +49,9 @@ public class StompTemplate {
@PostConstruct @PostConstruct
private void postConstruct() { private void postConstruct() {
executorService.execute(() -> stompClient.connect(webSocketUrl, new WebSocketSessionHandler())); log.info("Connecting to STOMP endpoint: " + webSocketUrl);
new Thread(new WebSocketSessionListener()).start(); taskExecutor.execute(() -> stompClient.connect(webSocketUrl, new WebSocketSessionHandler()));
taskExecutor.execute(new WebSocketSessionListener());
} }
@PreDestroy @PreDestroy
@ -74,17 +76,17 @@ public class StompTemplate {
} }
private class SubscriptionRequest { private class SubscriptionRequest {
public final String destination; final String destination;
public final StompFrameHandler handler; final StompFrameHandler handler;
public final Consumer<Subscription> callback; final Consumer<Subscription> callback;
public SubscriptionRequest(String destination, StompFrameHandler handler, Consumer<Subscription> callback) { SubscriptionRequest(String destination, StompFrameHandler handler, Consumer<Subscription> callback) {
this.destination = destination; this.destination = destination;
this.handler = handler; this.handler = handler;
this.callback = callback; this.callback = callback;
} }
public void onSubscribed(Subscription subscription) { void onSubscribed(Subscription subscription) {
if (callback != null) { if (callback != null) {
callback.accept(subscription); callback.accept(subscription);
} }
@ -97,6 +99,18 @@ public class StompTemplate {
log.info("Connected to STOMP endpoint"); log.info("Connected to STOMP endpoint");
stompSession = session; stompSession = session;
} }
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
log.error("STOMP session exception", exception);
super.handleException(session, command, headers, payload, exception);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
log.error("STOMP session transport error", exception);
super.handleTransportError(session, exception);
}
} }
private class WebSocketSessionListener implements Runnable { private class WebSocketSessionListener implements Runnable {
@ -114,6 +128,7 @@ public class StompTemplate {
} }
try { try {
final SubscriptionRequest request = subscriptionRequests.take(); final SubscriptionRequest request = subscriptionRequests.take();
log.debug("Subscribing to destination: {}", request.destination);
final Subscription subscription = stompSession.subscribe(request.destination, request.handler); final Subscription subscription = stompSession.subscribe(request.destination, request.handler);
request.onSubscribed(subscription); request.onSubscribed(subscription);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -43,8 +43,10 @@
<!-- Add src/main/resources to the classpath --> <!-- Add src/main/resources to the classpath -->
<!-- Remove duplicate resources from target/classes --> <!-- Remove duplicate resources from target/classes -->
<addResources>true</addResources> <addResources>true</addResources>
<folders>
<!-- Add src/main/config to the classpath --> <!-- Add src/main/config to the classpath -->
<folders>src/main/config</folders> <folder>${project.basedir}/src/main/config</folder>
</folders>
</configuration> </configuration>
<executions> <executions>
<!-- Repackage as executable JAR (java -jar) --> <!-- Repackage as executable JAR (java -jar) -->

View File

@ -3,3 +3,9 @@
server: server:
address: localhost address: localhost
port: 8080 port: 8080
# Logging
logging:
level:
lingo: DEBUG
web: DEBUG