-
Notifications
You must be signed in to change notification settings - Fork 98
Description
En la clase Socket, se identificaron varios code smells que afectaban la modularidad, la flexibilidad y la mantenibilidad del código. Este issue describe las técnicas de refactorización aplicadas para resolver estos problemas.
package org.phoenixframework.channels;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
public class Socket {
private static final Logger log = LoggerFactory.getLogger(Socket.class);
// **Introduce Null Object** - Creación de WebSocket "nulo"
public class NullWebSocket extends WebSocket {
@Override
public void send(String text) {
// No hace nada si el WebSocket es nulo
}
@Override
public void close(int code, String reason) {
// No hace nada si el WebSocket es nulo
}
@Override
public void onMessage(String text) {
// No hace nada si el WebSocket es nulo
}
}
// **Introduce Local Extension** - WebSocketManager como extensión local
private class WebSocketManager {
private WebSocket webSocket = null;
private final OkHttpClient httpClient = new OkHttpClient();
private final String endpointUri;
public WebSocketManager(String endpointUri) {
this.endpointUri = endpointUri;
}
public void connect() throws IOException {
log.trace("connect");
final String httpUrl = this.endpointUri.replaceFirst("^ws:", "http:")
.replaceFirst("^wss:", "https:");
final Request request = new Request.Builder().url(httpUrl).build();
webSocket = httpClient.newWebSocket(request, wsListener);
}
public void disconnect() throws IOException {
log.trace("disconnect");
if (webSocket != null) {
webSocket.close(CLOSE_GOING_AWAY, "Disconnected by client");
}
}
public boolean isConnected() {
return webSocket != null;
}
public WebSocket getWebSocket() {
return webSocket;
}
}
// Replace Magic Number with Symbolic Constant
public static final int RECONNECT_INTERVAL_MS = 5000;
private static final int DEFAULT_HEARTBEAT_INTERVAL = 7000;
private static final int CLOSE_GOING_AWAY = 1001; // Nueva constante simbólica
private final List<Channel> channels = new ArrayList<>();
private String endpointUri = null;
private final Set<IErrorCallback> errorCallbacks = Collections.newSetFromMap(new HashMap<IErrorCallback, Boolean>());
private final int heartbeatInterval;
private TimerTask heartbeatTimerTask = null;
private final Set<IMessageCallback> messageCallbacks = Collections.newSetFromMap(new HashMap<IMessageCallback, Boolean>());
private final ObjectMapper objectMapper = new ObjectMapper();
private boolean reconnectOnFailure = true;
private TimerTask reconnectTimerTask = null;
private int refNo = 1;
private final LinkedBlockingQueue<RequestBody> sendBuffer = new LinkedBlockingQueue<>();
private final Set<ISocketCloseCallback> socketCloseCallbacks = Collections.newSetFromMap(new HashMap<ISocketCloseCallback, Boolean>());
private final Set<ISocketOpenCallback> socketOpenCallbacks = Collections.newSetFromMap(new HashMap<ISocketOpenCallback, Boolean>());
private Timer timer = null;
// **Uso de WebSocketManager** - Se crea la instancia del WebSocketManager para manejar WebSocket.
private WebSocketManager webSocketManager;
private final PhoenixWSListener wsListener = new PhoenixWSListener();
public Socket(final String endpointUri) throws IOException {
this(endpointUri, DEFAULT_HEARTBEAT_INTERVAL);
}
public Socket(final String endpointUri, final int heartbeatIntervalInMs) {
log.trace("PhoenixSocket({})", endpointUri);
this.endpointUri = endpointUri;
this.heartbeatInterval = heartbeatIntervalInMs;
this.timer = new Timer("Reconnect Timer for " + endpointUri);
this.webSocketManager = new WebSocketManager(endpointUri); // Crear el WebSocketManager
}
public Channel chan(final String topic, final JsonNode payload) {
log.trace("chan: {}, {}", topic, payload);
final Channel channel = new Channel(topic, payload, Socket.this);
synchronized (channels) {
channels.add(channel);
}
return channel;
}
public void connect() throws IOException {
webSocketManager.connect(); // Delegar la conexión a WebSocketManager
}
public void disconnect() throws IOException {
webSocketManager.disconnect(); // Delegar la desconexión a WebSocketManager
}
public boolean isConnected() {
return webSocketManager.isConnected(); // Delegar la verificación de conexión
}
public Socket onClose(final ISocketCloseCallback callback) {
this.socketCloseCallbacks.add(callback);
return this;
}
public Socket onError(final IErrorCallback callback) {
this.errorCallbacks.add(callback);
return this;
}
public Socket onMessage(final IMessageCallback callback) {
this.messageCallbacks.add(callback);
return this;
}
public Socket onOpen(final ISocketOpenCallback callback) {
cancelReconnectTimer();
this.socketOpenCallbacks.add(callback);
return this;
}
public Socket push(final Envelope envelope) throws IOException {
final ObjectNode node = objectMapper.createObjectNode();
node.put("topic", envelope.getTopic());
node.put("event", envelope.getEvent());
node.put("ref", envelope.getRef());
node.put("join_ref", envelope.getJoinRef());
node.set("payload", envelope.getPayload() == null ? objectMapper.createObjectNode() : envelope.getPayload());
final String json = objectMapper.writeValueAsString(node);
log.trace("push: {}, isConnected:{}, JSON:{}", envelope, isConnected(), json);
RequestBody body = RequestBody.create(MediaType.parse("text/xml"), json);
if (this.isConnected()) {
webSocketManager.getWebSocket().send(json); // Usar el WebSocketManager para enviar el mensaje
} else {
this.sendBuffer.add(body);
}
return this;
}
public void reconectOnFailure(final boolean reconnectOnFailure) {
this.reconnectOnFailure = reconnectOnFailure;
}
public void remove(final Channel channel) {
synchronized (channels) {
for (final Iterator chanIter = channels.iterator(); chanIter.hasNext(); ) {
if (chanIter.next() == channel) {
chanIter.remove();
break;
}
}
}
}
public void removeAllChannels() {
synchronized (channels) {
channels.clear();
}
}
@Override
public String toString() {
return "PhoenixSocket{" +
"endpointUri='" + endpointUri + '\'' +
", channels(" + channels.size() + ")=" + channels +
", refNo=" + refNo +
", webSocket=" + webSocketManager.getWebSocket() +
'}';
}
synchronized String makeRef() {
refNo = (refNo + 1) % Integer.MAX_VALUE;
return Integer.toString(refNo);
}
private void cancelHeartbeatTimer() {
if (Socket.this.heartbeatTimerTask != null) {
Socket.this.heartbeatTimerTask.cancel();
}
}
private void cancelReconnectTimer() {
if (Socket.this.reconnectTimerTask != null) {
Socket.this.reconnectTimerTask.cancel();
}
}
private void flushSendBuffer() {
while (this.isConnected() && !this.sendBuffer.isEmpty()) {
final RequestBody body = this.sendBuffer.remove();
this.webSocketManager.getWebSocket().send(body.toString());
}
}
private void scheduleReconnectTimer() {
cancelReconnectTimer();
cancelHeartbeatTimer();
Socket.this.reconnectTimerTask = new TimerTask() {
@Override
public void run() {
log.trace("reconnectTimerTask run");
try {
Socket.this.connect();
} catch (Exception e) {
log.error("Failed to reconnect to " + Socket.this.wsListener, e);
}
}
};
timer.schedule(Socket.this.reconnectTimerTask, RECONNECT_INTERVAL_MS);
}
private void startHeartbeatTimer() {
Socket.this.heartbeatTimerTask = new TimerTask() {
@Override
public void run() {
log.trace("heartbeatTimerTask run");
if (Socket.this.isConnected()) {
try {
Envelope envelope = new Envelope("phoenix", "heartbeat",
new ObjectNode(JsonNodeFactory.instance), Socket.this.makeRef(), null);
Socket.this.push(envelope);
} catch (Exception e) {
log.error("Failed to send heartbeat", e);
}
}
}
};
timer.schedule(Socket.this.heartbeatTimerTask, Socket.this.heartbeatInterval,
Socket.this.heartbeatInterval);
}
private void triggerChannelError() {
synchronized (channels) {
for (final Channel channel : channels) {
channel.trigger(ChannelEvent.ERROR.getPhxEvent(), null);
}
}
}
static String replyEventName(final String ref) {
return "chan_reply_" + ref;
}
}