/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPromise;
import io.netty.channel.epoll.AbstractEpollStreamChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.proxy.server.BrokerProxyValidator;
import org.apache.pulsar.proxy.server.Claims;
import org.apache.pulsar.proxy.server.DirectProxyHandler;
import org.apache.pulsar.proxy.server.LookupProxyHandler;
import org.apache.pulsar.proxy.server.ProxyClientCnx;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.apache.pulsar.proxy.server.TargetAddressDeniedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyConnection
extends PulsarHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
    private ConnectionPool connectionPool;
    private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0L, 0x3FFFFFFFFFFFFFFFL));
    private final ProxyService service;
    private final DnsAddressResolverGroup dnsAddressResolverGroup;
    private State state;
    private LookupProxyHandler lookupProxyHandler = null;
    private DirectProxyHandler directProxyHandler = null;
    private ScheduledFuture<?> authRefreshTask;
    private long authChallengeSentTime = Long.MAX_VALUE;
    private FeatureFlags features;
    private Set<CompletableFuture<AuthData>> pendingBrokerAuthChallenges = null;
    private final BrokerProxyValidator brokerProxyValidator;
    private final ConnectionController connectionController;
    String clientAuthRole;
    volatile AuthData clientAuthData;
    String clientAuthMethod;
    String clientVersion;
    private String authMethod = "none";
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    private ClientConfigurationData clientConf;
    private boolean hasProxyToBrokerUrl;
    private int protocolVersionToAdvertise;
    private String proxyToBrokerUrl;
    private HAProxyMessage haProxyMessage;
    protected static final Integer SPLICE_BYTES = 0x40000000;
    private static final byte[] EMPTY_CREDENTIALS = new byte[0];
    boolean isTlsInboundChannel = false;
    private Claims claims;

    ConnectionPool getConnectionPool() {
        return this.connectionPool;
    }

    public ProxyConnection(ProxyService proxyService, DnsAddressResolverGroup dnsAddressResolverGroup) {
        super(30, TimeUnit.SECONDS);
        this.service = proxyService;
        this.dnsAddressResolverGroup = dnsAddressResolverGroup;
        this.state = State.Init;
        this.brokerProxyValidator = this.service.getBrokerProxyValidator();
        this.connectionController = proxyService.getConnectionController();
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        ProxyService.ACTIVE_CONNECTIONS.inc();
        SocketAddress rmAddress = ctx.channel().remoteAddress();
        ConnectionController.State state = this.connectionController.increaseConnection(rmAddress);
        if (!state.equals((Object)ConnectionController.State.OK)) {
            ctx.writeAndFlush((Object)Commands.newError((long)-1L, (ServerError)ServerError.NotAllowedError, (String)(state.equals((Object)ConnectionController.State.REACH_MAX_CONNECTION) ? "Reached the maximum number of connections" : "Reached the maximum number of connections on address" + String.valueOf(rmAddress)))).addListener(result -> ctx.close());
            ProxyService.REJECTED_CONNECTIONS.inc();
        }
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        this.connectionController.decreaseConnection(ctx.channel().remoteAddress());
        ProxyService.ACTIVE_CONNECTIONS.dec();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ProxyService.NEW_CONNECTIONS.inc();
        this.service.getClientCnxs().add(this);
        this.isTlsInboundChannel = ProxyConnection.isTlsChannel(ctx.channel());
        LOG.info("[{}] New connection opened", (Object)this.remoteAddress);
    }

    public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (this.directProxyHandler != null) {
            this.directProxyHandler.close();
            this.directProxyHandler = null;
        }
        if (this.authRefreshTask != null) {
            this.authRefreshTask.cancel(false);
        }
        if (this.pendingBrokerAuthChallenges != null) {
            this.pendingBrokerAuthChallenges.forEach(future -> future.cancel(true));
            this.pendingBrokerAuthChallenges = null;
        }
        this.service.getClientCnxs().remove((Object)this);
        LOG.info("[{}] Connection closed", (Object)this.remoteAddress);
        if (this.connectionPool != null) {
            try {
                this.connectionPool.close();
                this.connectionPool = null;
            }
            catch (Exception e) {
                LOG.error("Failed to close connection pool {}", (Object)e.getMessage(), (Object)e);
            }
        }
        this.state = State.Closed;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOG.warn("[{}] Got exception {} : Message: {} State: {}", new Object[]{this.remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), this.state, ClientCnx.isKnownException((Throwable)cause) ? null : cause});
        if (this.state != State.Closed) {
            this.state = State.Closing;
        }
        if (ctx.channel().isOpen()) {
            ctx.close();
        } else if (this.directProxyHandler != null) {
            this.directProxyHandler.close();
            this.directProxyHandler = null;
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (this.directProxyHandler != null && this.directProxyHandler.outboundChannel != null) {
            this.directProxyHandler.outboundChannel.config().setAutoRead(ctx.channel().isWritable());
        }
        super.channelWritabilityChanged(ctx);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HAProxyMessage) {
            this.haProxyMessage = (HAProxyMessage)msg;
            return;
        }
        switch (this.state) {
            case Init: 
            case Connecting: 
            case ProxyLookupRequests: {
                super.channelRead(ctx, msg);
                break;
            }
            case ProxyConnectionToBroker: {
                if (this.directProxyHandler != null) {
                    ProxyService.OPS_COUNTER.inc();
                    if (msg instanceof ByteBuf) {
                        int bytes = ((ByteBuf)msg).readableBytes();
                        this.directProxyHandler.getInboundChannelRequestsRate().recordEvent((long)bytes);
                        ProxyService.BYTES_COUNTER.inc((double)bytes);
                    }
                    this.directProxyHandler.outboundChannel.writeAndFlush(msg, this.directProxyHandler.outboundChannel.voidPromise());
                    if (!this.service.proxyZeroCopyModeEnabled || this.service.proxyLogLevel != 0 || this.directProxyHandler.isTlsOutboundChannel || this.isTlsInboundChannel) break;
                    if (ctx.pipeline().get("readTimeoutHandler") != null) {
                        ctx.pipeline().remove("readTimeoutHandler");
                    }
                    ProxyConnection.spliceNIC2NIC((EpollSocketChannel)ctx.channel(), (EpollSocketChannel)this.directProxyHandler.outboundChannel, SPLICE_BYTES).addListener(future -> {
                        ProxyService.OPS_COUNTER.inc();
                        ProxyService.BYTES_COUNTER.inc((double)SPLICE_BYTES.intValue());
                        this.directProxyHandler.getInboundChannelRequestsRate().recordEvent((long)SPLICE_BYTES.intValue());
                    });
                    break;
                }
                LOG.warn("Received message of type {} while connection to broker is missing in state {}. Dropping the input message (readable bytes={}).", new Object[]{msg.getClass(), this.state, msg instanceof ByteBuf ? ((ByteBuf)msg).readableBytes() : -1});
                break;
            }
            case ProxyConnectingToBroker: {
                LOG.warn("Received message of type {} while connecting to broker. Dropping the input message (readable bytes={}).", msg.getClass(), (Object)(msg instanceof ByteBuf ? ((ByteBuf)msg).readableBytes() : -1));
                break;
            }
        }
    }

    protected static ChannelPromise spliceNIC2NIC(EpollSocketChannel inboundChannel, EpollSocketChannel outboundChannel, int spliceLength) {
        ChannelPromise promise = inboundChannel.newPromise();
        inboundChannel.spliceTo((AbstractEpollStreamChannel)outboundChannel, spliceLength, promise);
        promise.addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (!future.isSuccess() && !(future.cause() instanceof ClosedChannelException)) {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }
        }));
        return promise;
    }

    protected static boolean isTlsChannel(Channel channel) {
        return channel.pipeline().get("tls") != null;
    }

    private synchronized void completeConnect() throws PulsarClientException {
        Preconditions.checkArgument((this.state == State.Connecting ? 1 : 0) != 0);
        LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole, this.hasProxyToBrokerUrl});
        if (this.hasProxyToBrokerUrl) {
            if (this.service.getConfiguration().isCheckActiveBrokers() && !this.isBrokerActive(this.proxyToBrokerUrl)) {
                this.state = State.Closing;
                LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, this.authMethod, this.clientAuthRole});
                ByteBuf msg = Commands.newError((long)-1L, (ServerError)ServerError.ServiceNotReady, (String)"Target broker isn't available.");
                this.writeAndFlushAndClose(msg);
                return;
            }
            this.state = State.ProxyConnectingToBroker;
            ((CompletableFuture)this.brokerProxyValidator.resolveAndCheckTargetAddress(this.proxyToBrokerUrl).thenAcceptAsync(this::connectToBroker, (Executor)this.ctx.executor())).exceptionally(throwable -> {
                if (throwable instanceof TargetAddressDeniedException || throwable.getCause() instanceof TargetAddressDeniedException) {
                    TargetAddressDeniedException targetAddressDeniedException = (TargetAddressDeniedException)(throwable instanceof TargetAddressDeniedException ? throwable : throwable.getCause());
                    LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, targetAddressDeniedException.getMessage(), this.authMethod, this.clientAuthRole});
                } else {
                    LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, this.authMethod, this.clientAuthRole, throwable});
                }
                ByteBuf msg = Commands.newError((long)-1L, (ServerError)ServerError.ServiceNotReady, (String)"Target broker cannot be validated.");
                this.writeAndFlushAndClose(msg);
                return null;
            });
        } else {
            Supplier<ClientCnx> clientCnxSupplier = this.service.getConfiguration().isAuthenticationEnabled() ? () -> new ProxyClientCnx(this.clientConf, this.service.getWorkerGroup(), this.clientAuthRole, this.clientAuthMethod, this.protocolVersionToAdvertise, this.service.getConfiguration().isForwardAuthorizationCredentials(), this) : () -> new ClientCnx(this.clientConf, this.service.getWorkerGroup(), this.protocolVersionToAdvertise);
            if (this.connectionPool == null) {
                this.connectionPool = new ConnectionPool(this.clientConf, this.service.getWorkerGroup(), clientCnxSupplier, Optional.of(this.dnsAddressResolverGroup.getResolver((EventExecutor)this.service.getWorkerGroup().next())));
            } else {
                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", new Object[]{this.remoteAddress, this.state, this.clientAuthRole});
            }
            this.state = State.ProxyLookupRequests;
            this.lookupProxyHandler = this.service.newLookupProxyHandler(this, this.claims);
            if (this.service.getConfiguration().isAuthenticationEnabled() && this.service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
                this.authRefreshTask = this.ctx.executor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::refreshAuthenticationCredentialsAndCloseIfTooExpired), (long)this.service.getConfiguration().getAuthenticationRefreshCheckSeconds(), (long)this.service.getConfiguration().getAuthenticationRefreshCheckSeconds(), TimeUnit.SECONDS);
            }
            ByteBuf msg = Commands.newConnected((int)this.protocolVersionToAdvertise, (boolean)false);
            this.writeAndFlush(msg);
        }
    }

    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
        assert (this.ctx.executor().inEventLoop());
        if (this.state == State.ProxyConnectingToBroker && this.ctx.channel().isOpen() && this.directProxyHandler == null) {
            this.directProxyHandler = directProxyHandler;
            this.state = State.ProxyConnectionToBroker;
            int maxMessageSize = connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : -1;
            ByteBuf msg = Commands.newConnected((int)connected.getProtocolVersion(), (int)maxMessageSize, (connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers() ? 1 : 0) != 0);
            this.writeAndFlush(msg);
        } else {
            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. Closing connection to broker '{}'.", new Object[]{this.remoteAddress, this.ctx.channel().isOpen() ? "open" : "already closed", this.state != State.ProxyConnectingToBroker ? "invalid state " + String.valueOf((Object)this.state) : "state " + String.valueOf((Object)this.state), this.proxyToBrokerUrl});
            directProxyHandler.close();
            this.ctx.close();
        }
    }

    private void connectToBroker(InetSocketAddress brokerAddress) {
        assert (this.ctx.executor().inEventLoop());
        DirectProxyHandler directProxyHandler = new DirectProxyHandler(this.service, this, this.claims);
        directProxyHandler.connect(this.proxyToBrokerUrl, brokerAddress, this.protocolVersionToAdvertise);
    }

    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
        try {
            CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
            this.handleBrokerConnected(directProxyHandler, finalConnected);
        }
        catch (RejectedExecutionException e) {
            LOG.error("Event loop was already closed. Closing broker connection.", (Throwable)e);
            directProxyHandler.close();
        }
        catch (AssertionError e) {
            LOG.error("Failed assertion, closing direct proxy handler.", (Throwable)((Object)e));
            directProxyHandler.close();
        }
    }

    private void doAuthentication(AuthData clientData) throws Exception {
        this.authState.authenticateAsync(clientData).whenCompleteAsync((authChallenge, throwable) -> {
            if (throwable == null) {
                this.authChallengeSuccessCallback((AuthData)authChallenge);
            } else {
                this.authenticationFailedCallback((Throwable)throwable);
            }
        }, (Executor)this.ctx.executor());
    }

    protected void authenticationFailedCallback(Throwable t) {
        LOG.warn("[{}] Unable to authenticate: ", (Object)this.remoteAddress, (Object)t);
        ByteBuf msg = Commands.newError((long)-1L, (ServerError)ServerError.AuthenticationError, (String)"Failed to authenticate");
        this.writeAndFlushAndClose(msg);
    }

    protected void authChallengeSuccessCallback(AuthData authChallenge) {
        try {
            if (authChallenge == null) {
                this.clientAuthRole = this.authState.getAuthRole();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole});
                }
                if (this.state == State.Connecting) {
                    this.completeConnect();
                }
                return;
            }
            ByteBuf msg = Commands.newAuthChallenge((String)this.authMethod, (AuthData)authChallenge, (int)this.protocolVersionToAdvertise);
            this.writeAndFlush(msg);
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Authentication in progress client by method {}.", (Object)this.remoteAddress, (Object)this.authMethod);
            }
        }
        catch (Exception e) {
            this.authenticationFailedCallback(e);
        }
    }

    private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
        assert (this.ctx.executor().inEventLoop());
        if (this.state != State.ProxyLookupRequests) {
            return;
        }
        if (!this.authState.isExpired()) {
            return;
        }
        if (System.nanoTime() - this.authChallengeSentTime > TimeUnit.SECONDS.toNanos(this.service.getConfiguration().getAuthenticationRefreshCheckSeconds())) {
            LOG.warn("[{}] Closing connection after timeout on refreshing auth credentials", (Object)this.remoteAddress);
            this.ctx.close();
        }
        this.maybeSendAuthChallenge();
    }

    private void maybeSendAuthChallenge() {
        assert (this.ctx.executor().inEventLoop());
        if (!this.supportsAuthenticationRefresh()) {
            LOG.warn("[{}] Closing connection because client doesn't support auth credentials refresh", (Object)this.remoteAddress);
            this.ctx.close();
            return;
        }
        if (this.authChallengeSentTime != Long.MAX_VALUE) {
            return;
        }
        if (this.service.getConfiguration().getAuthenticationRefreshCheckSeconds() < 1) {
            LOG.warn("[{}] Closing connection because auth credentials refresh is disabled", (Object)this.remoteAddress);
            this.ctx.close();
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Refreshing authentication credentials", (Object)this.remoteAddress);
        }
        try {
            AuthData challenge = this.authState.refreshAuthentication();
            this.writeAndFlush(Commands.newAuthChallenge((String)this.authMethod, (AuthData)challenge, (int)this.protocolVersionToAdvertise));
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", (Object)this.remoteAddress, (Object)this.authMethod);
            }
            this.authChallengeSentTime = System.nanoTime();
        }
        catch (AuthenticationException e) {
            LOG.warn("[{}] Failed to refresh authentication: {}", (Object)this.remoteAddress, (Object)e);
            this.ctx.close();
        }
    }

    protected void handleConnect(CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Init ? 1 : 0) != 0);
        this.state = State.Connecting;
        this.setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
        this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
        this.protocolVersionToAdvertise = ProxyConnection.getProtocolVersionToAdvertise(connect);
        this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null";
        this.clientVersion = connect.getClientVersion();
        this.features = new FeatureFlags();
        if (connect.hasFeatureFlags()) {
            this.features.copyFrom(connect.getFeatureFlags());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received CONNECT from {} proxyToBroker={}", (Object)this.remoteAddress, (Object)this.proxyToBrokerUrl);
            LOG.debug("[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}", new Object[]{this.remoteAddress, this.protocolVersionToAdvertise, this.getRemoteEndpointProtocolVersion(), Commands.getCurrentProtocolVersion()});
        }
        if (this.getRemoteEndpointProtocolVersion() < ProtocolVersion.v10.getValue()) {
            LOG.warn("[{}] Client doesn't support connecting through proxy", (Object)this.remoteAddress);
            this.state = State.Closing;
            this.ctx.close();
            return;
        }
        if (connect.hasProxyVersion()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Client illegally provided proxyVersion.", (Object)this.remoteAddress);
            }
            this.state = State.Closing;
            this.writeAndFlushAndClose(Commands.newError((long)-1L, (ServerError)ServerError.NotAllowedError, (String)"Must not provide proxyVersion"));
            return;
        }
        try {
            this.clientConf = this.createClientConfiguration();
            this.claims = Claims.authenticateC8Y(this.service, connect);
            if (!this.service.getConfiguration().isAuthenticationEnabled()) {
                this.completeConnect();
                return;
            }
            AuthData clientData = AuthData.of((byte[])(connect.hasAuthData() ? connect.getAuthData() : EMPTY_CREDENTIALS));
            this.authMethod = connect.hasAuthMethodName() ? connect.getAuthMethodName() : (connect.hasAuthMethod() ? connect.getAuthMethod().name().substring(10).toLowerCase() : "none");
            if (this.service.getConfiguration().isForwardAuthorizationCredentials()) {
                this.clientAuthData = clientData;
                this.clientAuthMethod = this.authMethod;
            }
            this.authenticationProvider = this.service.getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.clientAuthRole = (String)this.service.getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured"));
                this.completeConnect();
                return;
            }
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(clientData, this.remoteAddress, sslSession);
            this.doAuthentication(clientData);
        }
        catch (Exception e) {
            this.authenticationFailedCallback(e);
        }
    }

    protected void handleAuthResponse(CommandAuthResponse authResponse) {
        Preconditions.checkArgument((boolean)authResponse.hasResponse());
        Preconditions.checkArgument((authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName() ? 1 : 0) != 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received AuthResponse from {}, auth method: {}", (Object)this.remoteAddress, (Object)authResponse.getResponse().getAuthMethodName());
        }
        try {
            this.authChallengeSentTime = Long.MAX_VALUE;
            AuthData clientData = AuthData.of((byte[])authResponse.getResponse().getAuthData());
            this.doAuthentication(clientData);
            if (this.service.getConfiguration().isForwardAuthorizationCredentials()) {
                this.clientAuthData = clientData;
                if (this.pendingBrokerAuthChallenges != null && !this.pendingBrokerAuthChallenges.isEmpty()) {
                    for (CompletableFuture<AuthData> challenge : this.pendingBrokerAuthChallenges) {
                        challenge.complete(clientData);
                    }
                    this.pendingBrokerAuthChallenges.clear();
                }
            }
        }
        catch (Exception e) {
            String errorMsg = "Unable to handleAuthResponse";
            LOG.warn("[{}] {} ", new Object[]{this.remoteAddress, errorMsg, e});
            ByteBuf msg = Commands.newError((long)-1L, (ServerError)ServerError.AuthenticationError, (String)errorMsg);
            this.writeAndFlushAndClose(msg);
        }
    }

    protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
    }

    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
    }

    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleGetSchema(commandGetSchema);
    }

    protected void handleLookup(CommandLookupTopic lookup) {
        Preconditions.checkArgument((this.state == State.ProxyLookupRequests ? 1 : 0) != 0);
        this.lookupProxyHandler.handleLookup(lookup);
    }

    ClientConfigurationData createClientConfiguration() {
        ClientConfigurationData initialConf = new ClientConfigurationData();
        ProxyConfiguration proxyConfig = this.service.getConfiguration();
        initialConf.setServiceUrl(proxyConfig.isTlsEnabledWithBroker() ? this.service.getServiceUrlTls() : this.service.getServiceUrl());
        Map overrides = PropertiesUtils.filterAndMapProperties((Properties)proxyConfig.getProperties(), (String)"brokerClient_");
        ClientConfigurationData clientConf = (ClientConfigurationData)ConfigurationDataUtils.loadData((Map)overrides, (Object)initialConf, ClientConfigurationData.class);
        initialConf.setConnectionMaxIdleSeconds(-1);
        clientConf.setAuthentication(this.getClientAuthentication());
        if (proxyConfig.isTlsEnabledWithBroker()) {
            clientConf.setUseTls(true);
            clientConf.setTlsHostnameVerificationEnable(proxyConfig.isTlsHostnameVerificationEnabled());
            if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
                clientConf.setUseKeyStoreTls(true);
                clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
                clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
                clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
                clientConf.setTlsKeyStoreType(proxyConfig.getBrokerClientTlsKeyStoreType());
                clientConf.setTlsKeyStorePath(proxyConfig.getBrokerClientTlsKeyStore());
                clientConf.setTlsKeyStorePassword(proxyConfig.getBrokerClientTlsKeyStorePassword());
            } else {
                clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
                clientConf.setTlsKeyFilePath(proxyConfig.getBrokerClientKeyFilePath());
                clientConf.setTlsCertificateFilePath(proxyConfig.getBrokerClientCertificateFilePath());
            }
            clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
        }
        return clientConf;
    }

    private static int getProtocolVersionToAdvertise(CommandConnect connect) {
        return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
    }

    long newRequestId() {
        return this.requestIdGenerator.getAndIncrement();
    }

    public Authentication getClientAuthentication() {
        return this.service.getProxyClientAuthenticationPlugin();
    }

    protected boolean isHandshakeCompleted() {
        return this.state != State.Init;
    }

    SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public boolean hasHAProxyMessage() {
        return this.haProxyMessage != null;
    }

    public HAProxyMessage getHAProxyMessage() {
        return this.haProxyMessage;
    }

    private boolean isBrokerActive(String targetBrokerHostPort) {
        for (ServiceLookupData serviceLookupData : this.getAvailableBrokers()) {
            if (!ProxyConnection.matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort) && !ProxyConnection.matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(), targetBrokerHostPort)) continue;
            return true;
        }
        return false;
    }

    private List<? extends ServiceLookupData> getAvailableBrokers() {
        if (this.service.getDiscoveryProvider() == null) {
            LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null.zookeeperServers and configurationStoreServers must be configured in proxy configuration when checkActiveBrokers is enabled.");
            return Collections.emptyList();
        }
        try {
            return this.service.getDiscoveryProvider().getAvailableBrokers();
        }
        catch (PulsarServerException e) {
            LOG.error("Unable to get available brokers", (Throwable)e);
            return Collections.emptyList();
        }
    }

    static boolean matchesHostAndPort(String expectedPrefix, String pulsarServiceUrl, String brokerHostPort) {
        return pulsarServiceUrl != null && pulsarServiceUrl.length() == expectedPrefix.length() + brokerHostPort.length() && pulsarServiceUrl.startsWith(expectedPrefix) && pulsarServiceUrl.startsWith(brokerHostPort, expectedPrefix.length());
    }

    private void writeAndFlush(ByteBuf cmd) {
        NettyChannelUtil.writeAndFlushWithVoidPromise((ChannelOutboundInvoker)this.ctx, (ByteBuf)cmd);
    }

    private void writeAndFlushAndClose(ByteBuf cmd) {
        NettyChannelUtil.writeAndFlushWithClosePromise((ChannelOutboundInvoker)this.ctx, (ByteBuf)cmd);
    }

    boolean supportsAuthenticationRefresh() {
        return this.features != null && this.features.isSupportsAuthRefresh();
    }

    AuthData getClientAuthData() {
        return this.clientAuthData;
    }

    CompletableFuture<AuthData> getValidClientAuthData() {
        CompletableFuture<AuthData> clientAuthDataFuture = new CompletableFuture<AuthData>();
        this.ctx().executor().execute(Runnables.catchingAndLoggingThrowables(() -> {
            if (!this.authState.isExpired()) {
                clientAuthDataFuture.complete(this.clientAuthData);
            } else if (this.state == State.ProxyLookupRequests) {
                this.maybeSendAuthChallenge();
                if (this.pendingBrokerAuthChallenges == null) {
                    this.pendingBrokerAuthChallenges = new HashSet<CompletableFuture<AuthData>>();
                }
                this.pendingBrokerAuthChallenges.add(clientAuthDataFuture);
            } else {
                clientAuthDataFuture.completeExceptionally((Throwable)new PulsarClientException.AlreadyClosedException("ProxyConnection is not in a valid state to get client auth data for " + String.valueOf(this.remoteAddress)));
            }
        }));
        return clientAuthDataFuture;
    }

    public DirectProxyHandler getDirectProxyHandler() {
        return this.directProxyHandler;
    }

    static enum State {
        Init,
        Connecting,
        ProxyLookupRequests,
        ProxyConnectingToBroker,
        ProxyConnectionToBroker,
        Closing,
        Closed;

    }
}

