Skip to content

Refactorización de la clase Socket #79

@Kcmoranj

Description

@Kcmoranj

En la clase Socket, se identificaron varios code smells que afectaban la modularidad, la flexibilidad y la mantenibilidad del código. Este issue describe las técnicas de refactorización aplicadas para resolver estos problemas.
package org.phoenixframework.channels;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class Socket {

private static final Logger log = LoggerFactory.getLogger(Socket.class);

// **Introduce Null Object** - Creación de WebSocket "nulo"
public class NullWebSocket extends WebSocket {
    @Override
    public void send(String text) {
        // No hace nada si el WebSocket es nulo
    }

    @Override
    public void close(int code, String reason) {
        // No hace nada si el WebSocket es nulo
    }

    @Override
    public void onMessage(String text) {
        // No hace nada si el WebSocket es nulo
    }
}

// **Introduce Local Extension** - WebSocketManager como extensión local
private class WebSocketManager {
    private WebSocket webSocket = null;
    private final OkHttpClient httpClient = new OkHttpClient();
    private final String endpointUri;

    public WebSocketManager(String endpointUri) {
        this.endpointUri = endpointUri;
    }

    public void connect() throws IOException {
        log.trace("connect");
        final String httpUrl = this.endpointUri.replaceFirst("^ws:", "http:")
                .replaceFirst("^wss:", "https:");
        final Request request = new Request.Builder().url(httpUrl).build();
        webSocket = httpClient.newWebSocket(request, wsListener);
    }

    public void disconnect() throws IOException {
        log.trace("disconnect");
        if (webSocket != null) {
            webSocket.close(CLOSE_GOING_AWAY, "Disconnected by client");
        }
    }

    public boolean isConnected() {
        return webSocket != null;
    }

    public WebSocket getWebSocket() {
        return webSocket;
    }
}

// Replace Magic Number with Symbolic Constant
public static final int RECONNECT_INTERVAL_MS = 5000;
private static final int DEFAULT_HEARTBEAT_INTERVAL = 7000;
private static final int CLOSE_GOING_AWAY = 1001;  // Nueva constante simbólica

private final List<Channel> channels = new ArrayList<>();
private String endpointUri = null;
private final Set<IErrorCallback> errorCallbacks = Collections.newSetFromMap(new HashMap<IErrorCallback, Boolean>());
private final int heartbeatInterval;
private TimerTask heartbeatTimerTask = null;
private final Set<IMessageCallback> messageCallbacks = Collections.newSetFromMap(new HashMap<IMessageCallback, Boolean>());
private final ObjectMapper objectMapper = new ObjectMapper();
private boolean reconnectOnFailure = true;
private TimerTask reconnectTimerTask = null;
private int refNo = 1;
private final LinkedBlockingQueue<RequestBody> sendBuffer = new LinkedBlockingQueue<>();
private final Set<ISocketCloseCallback> socketCloseCallbacks = Collections.newSetFromMap(new HashMap<ISocketCloseCallback, Boolean>());
private final Set<ISocketOpenCallback> socketOpenCallbacks = Collections.newSetFromMap(new HashMap<ISocketOpenCallback, Boolean>());
private Timer timer = null;

// **Uso de WebSocketManager** - Se crea la instancia del WebSocketManager para manejar WebSocket.
private WebSocketManager webSocketManager;

private final PhoenixWSListener wsListener = new PhoenixWSListener();

public Socket(final String endpointUri) throws IOException {
    this(endpointUri, DEFAULT_HEARTBEAT_INTERVAL);
}

public Socket(final String endpointUri, final int heartbeatIntervalInMs) {
    log.trace("PhoenixSocket({})", endpointUri);
    this.endpointUri = endpointUri;
    this.heartbeatInterval = heartbeatIntervalInMs;
    this.timer = new Timer("Reconnect Timer for " + endpointUri);
    this.webSocketManager = new WebSocketManager(endpointUri); // Crear el WebSocketManager
}

public Channel chan(final String topic, final JsonNode payload) {
    log.trace("chan: {}, {}", topic, payload);
    final Channel channel = new Channel(topic, payload, Socket.this);
    synchronized (channels) {
        channels.add(channel);
    }
    return channel;
}

public void connect() throws IOException {
    webSocketManager.connect(); // Delegar la conexión a WebSocketManager
}

public void disconnect() throws IOException {
    webSocketManager.disconnect(); // Delegar la desconexión a WebSocketManager
}

public boolean isConnected() {
    return webSocketManager.isConnected(); // Delegar la verificación de conexión
}

public Socket onClose(final ISocketCloseCallback callback) {
    this.socketCloseCallbacks.add(callback);
    return this;
}

public Socket onError(final IErrorCallback callback) {
    this.errorCallbacks.add(callback);
    return this;
}

public Socket onMessage(final IMessageCallback callback) {
    this.messageCallbacks.add(callback);
    return this;
}

public Socket onOpen(final ISocketOpenCallback callback) {
    cancelReconnectTimer();
    this.socketOpenCallbacks.add(callback);
    return this;
}

public Socket push(final Envelope envelope) throws IOException {
    final ObjectNode node = objectMapper.createObjectNode();
    node.put("topic", envelope.getTopic());
    node.put("event", envelope.getEvent());
    node.put("ref", envelope.getRef());
    node.put("join_ref", envelope.getJoinRef());
    node.set("payload", envelope.getPayload() == null ? objectMapper.createObjectNode() : envelope.getPayload());
    final String json = objectMapper.writeValueAsString(node);

    log.trace("push: {}, isConnected:{}, JSON:{}", envelope, isConnected(), json);

    RequestBody body = RequestBody.create(MediaType.parse("text/xml"), json);

    if (this.isConnected()) {
        webSocketManager.getWebSocket().send(json); // Usar el WebSocketManager para enviar el mensaje
    } else {
        this.sendBuffer.add(body);
    }

    return this;
}

public void reconectOnFailure(final boolean reconnectOnFailure) {
    this.reconnectOnFailure = reconnectOnFailure;
}

public void remove(final Channel channel) {
    synchronized (channels) {
        for (final Iterator chanIter = channels.iterator(); chanIter.hasNext(); ) {
            if (chanIter.next() == channel) {
                chanIter.remove();
                break;
            }
        }
    }
}

public void removeAllChannels() {
    synchronized (channels) {
        channels.clear();
    }
}

@Override
public String toString() {
    return "PhoenixSocket{" +
        "endpointUri='" + endpointUri + '\'' +
        ", channels(" + channels.size() + ")=" + channels +
        ", refNo=" + refNo +
        ", webSocket=" + webSocketManager.getWebSocket() +
        '}';
}

synchronized String makeRef() {
    refNo = (refNo + 1) % Integer.MAX_VALUE;
    return Integer.toString(refNo);
}

private void cancelHeartbeatTimer() {
    if (Socket.this.heartbeatTimerTask != null) {
        Socket.this.heartbeatTimerTask.cancel();
    }
}

private void cancelReconnectTimer() {
    if (Socket.this.reconnectTimerTask != null) {
        Socket.this.reconnectTimerTask.cancel();
    }
}

private void flushSendBuffer() {
    while (this.isConnected() && !this.sendBuffer.isEmpty()) {
        final RequestBody body = this.sendBuffer.remove();
        this.webSocketManager.getWebSocket().send(body.toString());
    }
}

private void scheduleReconnectTimer() {
    cancelReconnectTimer();
    cancelHeartbeatTimer();

    Socket.this.reconnectTimerTask = new TimerTask() {
        @Override
        public void run() {
            log.trace("reconnectTimerTask run");
            try {
                Socket.this.connect();
            } catch (Exception e) {
                log.error("Failed to reconnect to " + Socket.this.wsListener, e);
            }
        }
    };
    timer.schedule(Socket.this.reconnectTimerTask, RECONNECT_INTERVAL_MS);
}

private void startHeartbeatTimer() {
    Socket.this.heartbeatTimerTask = new TimerTask() {
        @Override
        public void run() {
            log.trace("heartbeatTimerTask run");
            if (Socket.this.isConnected()) {
                try {
                    Envelope envelope = new Envelope("phoenix", "heartbeat",
                        new ObjectNode(JsonNodeFactory.instance), Socket.this.makeRef(), null);
                    Socket.this.push(envelope);
                } catch (Exception e) {
                    log.error("Failed to send heartbeat", e);
                }
            }
        }
    };

    timer.schedule(Socket.this.heartbeatTimerTask, Socket.this.heartbeatInterval,
        Socket.this.heartbeatInterval);
}

private void triggerChannelError() {
    synchronized (channels) {
        for (final Channel channel : channels) {
            channel.trigger(ChannelEvent.ERROR.getPhxEvent(), null);
        }
    }
}

static String replyEventName(final String ref) {
    return "chan_reply_" + ref;
}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions