package org.apache.pulsar.websocket;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/websocket/RelNotifProducerHandler.class */
public class RelNotifProducerHandler extends ProducerHandler {
    private Producer<byte[]> producer;
    private final LongAdder numMsgsSent;
    private final LongAdder numMsgsFailed;
    private final LongAdder numBytesSent;
    private final StatsBuckets publishLatencyStatsUSec;
    private volatile long msgPublishedCounter;
    private final String keyFile;
    private final ObjectMapper mapper;
    private RelNotifToken token;
    private static final AtomicLongFieldUpdater<RelNotifProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(RelNotifProducerHandler.class, "msgPublishedCounter");
    private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/websocket/RelNotifProducerHandler$ParseResult.class */
    public static class ParseResult {
        String contextHeader;
        List<String> notificationHeaders;
        String message;

        private ParseResult() {
        }
    }

    public RelNotifProducerHandler(WebSocketService webSocketService, HttpServletRequest httpServletRequest, ServletUpgradeResponse servletUpgradeResponse) {
        super(null, webSocketService, httpServletRequest, servletUpgradeResponse);
        this.numMsgsSent = new LongAdder();
        this.numMsgsFailed = new LongAdder();
        this.numBytesSent = new LongAdder();
        this.publishLatencyStatsUSec = new StatsBuckets(ProducerHandler.ENTRY_LATENCY_BUCKETS_USEC);
        this.msgPublishedCounter = 0L;
        this.mapper = new ObjectMapper();
        this.token = null;
        this.keyFile = webSocketService.getConfig().getWebSocketPublicKeyFile();
        verifyToken(httpServletRequest);
        this.topic = extractTopicName(httpServletRequest);
        try {
            this.producer = getRelNotifProducerBuilder(webSocketService.getPulsarClient()).topic(this.topic.toString()).create();
            if (!this.service.addProducer(this)) {
                log.warn("[{}:{}] Failed to add producer handler for topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic});
            }
        } catch (Exception e) {
            log.warn("[{}:{}] Failed in creating producer on topic {}: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic, e.getMessage()});
            try {
                servletUpgradeResponse.sendError(getErrorCode(e), getErrorMessage(e));
            } catch (IOException e2) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), e2.getMessage(), e2});
            }
        }
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler
    protected TopicName extractTopicName(HttpServletRequest httpServletRequest) {
        if (this.token == null) {
            return null;
        }
        return this.token.getTopicName();
    }

    private void verifyToken(HttpServletRequest httpServletRequest) {
        String[] strArr = (String[]) httpServletRequest.getParameterMap().get("token");
        if (strArr == null || strArr.length != 1) {
            throw new IllegalArgumentException("query string must contain one and only one token");
        }
        this.token = new RelNotifToken(strArr[0], this.keyFile);
        this.token.verifyWebSocketAllowed();
        this.token.verifyCanWrite();
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.producer != null) {
            if (!this.service.removeProducer(this)) {
                log.warn("[{}] Failed to remove producer handler", this.producer.getTopic());
            }
            this.producer.closeAsync().thenAccept(r5 -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed producer asynchronously", this.producer.getTopic());
                }
            }).exceptionally(th -> {
                log.warn("[{}] Failed to close producer", this.producer.getTopic(), th);
                return null;
            });
        }
    }

    private ParseResult parseHeaders(String str) {
        ParseResult parseResult = new ParseResult();
        ArrayList arrayList = new ArrayList(8);
        while (true) {
            int indexOf = str.indexOf(10);
            if (indexOf == -1) {
                break;
            }
            String substring = str.substring(0, indexOf);
            str = str.substring(indexOf + 1);
            if (substring.length() == 0) {
                break;
            }
            arrayList.add(substring);
        }
        parseResult.message = str;
        if (arrayList.isEmpty()) {
            return parseResult;
        }
        parseResult.contextHeader = (String) arrayList.get(0);
        parseResult.notificationHeaders = arrayList.subList(1, arrayList.size());
        return parseResult;
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public void onWebSocketText(String str) {
        byte[] decode;
        JsonNode jsonNode;
        String asText;
        ParseResult parseHeaders = parseHeaders(str);
        String str2 = parseHeaders.contextHeader;
        if (str2 == null) {
            throw new WebSocketException("Expected context id in message");
        }
        String str3 = parseHeaders.message;
        boolean isBinaryNamespace = RelNotifToken.isBinaryNamespace(this.token.getTopicName().getNamespacePortion());
        if (isBinaryNamespace) {
            try {
                decode = Base64.getDecoder().decode(str3);
            } catch (IllegalArgumentException e) {
                sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e.getMessage(), null, str2));
                return;
            }
        } else {
            decode = str3.getBytes(StandardCharsets.UTF_8);
        }
        long length = decode.length;
        TypedMessageBuilder newMessage = this.producer.newMessage();
        String str4 = null;
        if (!parseHeaders.notificationHeaders.isEmpty()) {
            str4 = parseHeaders.notificationHeaders.get(0);
        } else if (!isBinaryNamespace) {
            try {
                JsonNode findValue = this.mapper.readTree(str3).findValue("source");
                if (findValue != null && (jsonNode = findValue.get("id")) != null && (asText = jsonNode.asText()) != null) {
                    if (!asText.isEmpty()) {
                        str4 = asText;
                    }
                }
            } catch (Exception e2) {
                log.warn("Failed to extract source object id from message." + e2.getMessage());
            }
        }
        if (str4 != null && !str4.isEmpty()) {
            newMessage.key(str4);
        }
        try {
            newMessage.value(decode);
            long nanoTime = System.nanoTime();
            newMessage.sendAsync().thenAccept(messageId -> {
                updateSentMsgStats(length, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime));
                if (isConnected()) {
                    sendAckResponse(new ProducerAck(Base64.getEncoder().encodeToString(messageId.toByteArray()), str2));
                }
            }).exceptionally(th -> {
                log.warn("[{}] Error occurred while producer handler was sending msg from {}: {}", new Object[]{this.producer.getTopic(), getRemote().getInetSocketAddress().toString(), th.getMessage()});
                this.numMsgsFailed.increment();
                sendAckResponse(new ProducerAck(WebSocketError.UnknownError, th.getMessage(), null, str2));
                return null;
            });
        } catch (SchemaSerializationException e3) {
            sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e3.getMessage(), null, str2));
        }
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public Producer<byte[]> getProducer() {
        return this.producer;
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public long getAndResetNumMsgsSent() {
        return this.numMsgsSent.sumThenReset();
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public long getAndResetNumBytesSent() {
        return this.numBytesSent.sumThenReset();
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public long getAndResetNumMsgsFailed() {
        return this.numMsgsFailed.sumThenReset();
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public long[] getAndResetPublishLatencyStatsUSec() {
        this.publishLatencyStatsUSec.refresh();
        return this.publishLatencyStatsUSec.getBuckets();
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public StatsBuckets getPublishLatencyStatsUSec() {
        return this.publishLatencyStatsUSec;
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler
    public long getMsgPublishedCounter() {
        return this.msgPublishedCounter;
    }

    @Override // org.apache.pulsar.websocket.ProducerHandler, org.apache.pulsar.websocket.AbstractWebSocketHandler
    protected Boolean isAuthorized(String str, AuthenticationDataSource authenticationDataSource) throws Exception {
        return true;
    }

    private void sendAckResponse(ProducerAck producerAck) {
        try {
            String str = producerAck.context;
            if (!producerAck.result.equals("ok")) {
                str = str + "\n" + producerAck.errorMsg;
            }
            getSession().getRemote().sendString(str, new WriteCallback() { // from class: org.apache.pulsar.websocket.RelNotifProducerHandler.1
                public void writeFailed(Throwable th) {
                    RelNotifProducerHandler.log.warn("[{}] Failed to send ack: {}", RelNotifProducerHandler.this.producer.getTopic(), th.getMessage());
                }

                public void writeSuccess() {
                    if (RelNotifProducerHandler.log.isDebugEnabled()) {
                        RelNotifProducerHandler.log.debug("[{}] Ack was sent successfully to {}", RelNotifProducerHandler.this.producer.getTopic(), RelNotifProducerHandler.this.getRemote().getInetSocketAddress().toString());
                    }
                }
            });
        } catch (Exception e) {
            log.warn("[{}] Failed to send ack: {}", this.producer.getTopic(), e.getMessage());
        }
    }

    private void updateSentMsgStats(long j, long j2) {
        this.publishLatencyStatsUSec.addValue(j2);
        this.numBytesSent.add(j);
        this.numMsgsSent.increment();
        MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
    }

    private ProducerBuilder<byte[]> getRelNotifProducerBuilder(PulsarClient pulsarClient) {
        ProducerBuilder<byte[]> messageRoutingMode = pulsarClient.newProducer().enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition);
        messageRoutingMode.blockIfQueueFull(false);
        Optional<Integer> batchingMaxMessages = this.token.getBatchingMaxMessages(this.queryParams);
        if (batchingMaxMessages.isPresent()) {
            messageRoutingMode.batchingMaxMessages(batchingMaxMessages.get().intValue());
        }
        Optional<Integer> maxPendingMessages = this.token.getMaxPendingMessages(this.queryParams);
        if (maxPendingMessages.isPresent()) {
            messageRoutingMode.batchingMaxMessages(maxPendingMessages.get().intValue());
        }
        return messageRoutingMode;
    }
}
