/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.websocket.JwtClaimsHelper;
import org.apache.pulsar.websocket.WebSocketService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RelNotifControlServlet
extends HttpServlet {
    public static final String SERVLET_PATH = "/notification2/control";
    private static final long serialVersionUID = 1L;
    private final transient WebSocketService service;
    private static final Logger log = LoggerFactory.getLogger(RelNotifControlServlet.class);
    private static ExecutorService provisionerExecutor = Executors.newSingleThreadExecutor();

    public RelNotifControlServlet(WebSocketService service) {
        this.service = service;
    }

    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        resp.setContentType("application/json;charset=utf-8");
        String pathInfo = req.getPathInfo();
        Map params = req.getParameterMap();
        if (!"/unsubscribe".equals(pathInfo)) {
            throw new ServletException("Unknown c8y notification2/control request");
        }
        this.unsubscribe(resp, params);
    }

    private PulsarAdmin getPulsarAdmin() throws PulsarServerException, PulsarClientException {
        String adminUrl = this.service.getServiceUrl();
        return PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();
    }

    private void unsubscribe(HttpServletResponse resp, Map<String, String[]> params) throws ServletException, IOException {
        String result;
        TokenInfo token = this.validateToken(params);
        boolean force = this.getBooleanParam(params, "force");
        boolean background = this.getBooleanParam(params, "background");
        if (background) {
            provisionerExecutor.execute(() -> this.unsubscribe(token, force));
            result = "{ \"result\": \"scheduled\" }";
        } else {
            Optional<Exception> exn = this.unsubscribe(token, force);
            if (exn.isPresent()) {
                resp.sendError(500, "Unsubscribe failed with: " + String.valueOf(exn));
                return;
            }
            result = "{ \"result\": \"done\" }";
        }
        resp.getWriter().print(result);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Exception> unsubscribe(TokenInfo token, boolean force) {
        try (PulsarAdmin pulsarAdmin = this.getPulsarAdmin();){
            pulsarAdmin.topics().deleteSubscription(token.tenantName + "/" + token.namespaceName + "/" + token.topicName, token.subscription, force);
        }
        catch (PulsarServerException | PulsarAdminException | PulsarClientException exn) {
            log.error("Unsubscribe subscriber failed: ", exn);
            return Optional.of(exn);
        }
        return Optional.empty();
    }

    private boolean getBooleanParam(Map<String, String[]> params, String which) {
        String forceValue;
        boolean param = false;
        if (params.containsKey(which) && ("true".equals(forceValue = params.get(which)[0]) || "1".equals(forceValue))) {
            param = true;
        }
        return param;
    }

    private TokenInfo validateToken(Map<String, String[]> params) throws ServletException {
        Map<String, Object> claims;
        if (!params.containsKey("token")) {
            throw new ServletException("Expected token query parameter");
        }
        String token = params.get("token")[0];
        String keyFile = this.service.getConfig().getWebSocketPublicKeyFile();
        try {
            claims = JwtClaimsHelper.getJwtClaims(token, keyFile);
        }
        catch (Exception e) {
            throw new ServletException("Failed to verify token " + e.getMessage(), (Throwable)e);
        }
        String topic = (String)claims.get("topic");
        if (topic == null) {
            throw new ServletException("Expected topic in token");
        }
        String sub = (String)claims.get("sub");
        if (sub == null) {
            throw new ServletException("Expected sub in token");
        }
        return new TokenInfo(topic, sub);
    }

    private static class TokenInfo {
        String subscription;
        String tenantName;
        String namespaceName;
        String topicName;

        TokenInfo(String topic, String subscription) throws ServletException {
            this.subscription = subscription;
            String[] topicParts = topic.split("/");
            if (topicParts.length != 3) {
                throw new ServletException("Expected tenant/namespace/topic in token");
            }
            this.tenantName = topicParts[0];
            this.namespaceName = topicParts[1];
            this.topicName = topicParts[2];
        }
    }
}

