package com.cumulocity.opcua.client.gateway.operation;

import c8y.ua.command.BaseOperation;
import com.cumulocity.model.idtype.GId;
import com.cumulocity.model.operation.OperationStatus;
import com.cumulocity.opcua.client.gateway.configuration.GatewayGeneralConfiguration;
import com.cumulocity.opcua.client.gateway.datastore.DataStore;
import com.cumulocity.opcua.client.gateway.notification.OperationSubscriber;
import com.cumulocity.opcua.client.gateway.operation.handler.base.OperationHandler;
import com.cumulocity.rest.representation.operation.OperationRepresentation;
import com.cumulocity.sdk.client.devicecontrol.DeviceControlApi;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import net.jodah.expiringmap.ExpiringMap;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.svenson.SvensonRuntimeException;

@Component
/* loaded from: input_file:BOOT-INF/classes/com/cumulocity/opcua/client/gateway/operation/OperationExecutor.class */
public class OperationExecutor implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OperationExecutor.class);
    private static final long EXECUTION_DELAY = 100;

    @Autowired
    private GatewayGeneralConfiguration generalConfiguration;

    @Autowired
    private DeviceControlApi deviceControlApi;

    @Autowired
    private DataStore dataStore;

    @Autowired
    private ApplicationContext ctx;

    @Autowired
    private ThreadPoolTaskExecutor executor;
    private Queue<OperationRepresentation> pendingOperations;
    private Future<?> executorJob;
    private final ExpiringMap<String, DateTime> executedOperationsCache = ExpiringMap.builder().expiration(1, TimeUnit.DAYS).build();
    private Collection<OperationHandler<?>> operationHandlers = new ArrayList();
    private Map<String, Object> locks = new ConcurrentHashMap();

    public void submit(OperationRepresentation operationRepresentation) {
        try {
            if (!isOperationInTheQueue(operationRepresentation)) {
                this.pendingOperations.add(operationRepresentation);
            } else if (log.isDebugEnabled()) {
                log.debug("Operation {} has been already in the queue", operationRepresentation.getId().getValue());
            }
        } catch (IllegalStateException e) {
            log.warn("Operations queue is full, rejecting operation: {}", operationRepresentation);
            setOperationFailedWithReason(operationRepresentation.getId(), "Agent operation queue is full!");
        }
    }

    @Scheduled(fixedDelay = 60000)
    public void monitorExecutor() {
        if (this.executorJob.isDone() || this.executorJob.isCancelled()) {
            startOperationExecutor();
        }
    }

    @PostConstruct
    private void startOperationExecutor() {
        this.executorJob = Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                execute();
                try {
                    Thread.sleep(EXECUTION_DELAY);
                } catch (InterruptedException e) {
                    log.error("Operation executor job interrupted", (Throwable) e);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });
    }

    void execute() {
        Object obj;
        while (!this.pendingOperations.isEmpty()) {
            OperationRepresentation poll = this.pendingOperations.poll();
            if (this.locks.containsKey(poll.getId().getValue())) {
                obj = this.locks.get(poll.getId().getValue());
            } else {
                obj = new Object();
                this.locks.put(poll.getId().getValue(), obj);
            }
            Object obj2 = obj;
            this.executor.execute(() -> {
                synchronized (obj2) {
                    try {
                        OperationRepresentation operationRepresentation = poll;
                        GId id = operationRepresentation.getId();
                        if (isOperationOutdated(operationRepresentation)) {
                            try {
                                operationRepresentation = this.deviceControlApi.getOperation(operationRepresentation.getId());
                            } catch (SvensonRuntimeException e) {
                                log.warn("Error parsing operation with Svenson - setting operation " + id.getValue() + " as failed");
                                setOperationFailedWithReason(id, "Error parsing operation JSON! This operation is not supported.");
                                resetLongPollingSession();
                                this.locks.remove(poll.getId().getValue());
                                return;
                            }
                        }
                        if (!OperationStatus.PENDING.name().equalsIgnoreCase(operationRepresentation.getStatus())) {
                            log.info("Operation {} is not in PENDING state, skip executing.", operationRepresentation.getId());
                            this.locks.remove(poll.getId().getValue());
                            return;
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        if (this.executedOperationsCache.containsKey(operationRepresentation.getId().getValue())) {
                            log.info("Operation has been executed, skipping: {}", operationRepresentation.getId());
                            this.locks.remove(poll.getId().getValue());
                            return;
                        }
                        try {
                            log.info("Start executing operation: [{}]", operationRepresentation.getId().getValue());
                            operationRepresentation.setStatus(OperationStatus.EXECUTING.name());
                            this.deviceControlApi.update(operationRepresentation);
                            executeOperation(operationRepresentation);
                            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                            long minutes = TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis2);
                            log.info("Operation [{}] execution has been done, time elapsed: {}", operationRepresentation.getId().getValue(), String.format("%d min, %d seconds", Long.valueOf(minutes), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis2) - TimeUnit.MINUTES.toSeconds(minutes))));
                            this.executedOperationsCache.put(operationRepresentation.getId().getValue(), DateTime.now());
                            this.locks.remove(poll.getId().getValue());
                        } catch (Throwable th) {
                            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                            long minutes2 = TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis3);
                            log.info("Operation [{}] execution has been done, time elapsed: {}", operationRepresentation.getId().getValue(), String.format("%d min, %d seconds", Long.valueOf(minutes2), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis3) - TimeUnit.MINUTES.toSeconds(minutes2))));
                            this.executedOperationsCache.put(operationRepresentation.getId().getValue(), DateTime.now());
                            throw th;
                        }
                    } catch (Throwable th2) {
                        this.locks.remove(poll.getId().getValue());
                        throw th2;
                    }
                }
            });
        }
    }

    private void resetLongPollingSession() {
        ((OperationSubscriber) this.ctx.getBean(OperationSubscriber.class)).resubscribe();
    }

    void executeOperation(OperationRepresentation operationRepresentation) {
        List<OperationHandler> list = (List) this.operationHandlers.stream().filter(operationHandler -> {
            return operationHandler.supports(operationRepresentation);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            setOperationFailedWithReason(operationRepresentation.getId(), "Operation is not supported!");
            return;
        }
        for (OperationHandler operationHandler2 : list) {
            BaseOperation baseOperation = (BaseOperation) operationRepresentation.get(operationHandler2.getSupportedOperationType());
            baseOperation.setOperationId(operationRepresentation.getId());
            baseOperation.setDeviceId(operationRepresentation.getDeviceId());
            operationHandler2.handle(baseOperation);
        }
    }

    private void setOperationFailedWithReason(GId gId, String str) {
        OperationRepresentation operationRepresentation = new OperationRepresentation();
        operationRepresentation.setId(gId);
        operationRepresentation.setStatus(OperationStatus.FAILED.name());
        operationRepresentation.setFailureReason(str);
        try {
            this.deviceControlApi.update(operationRepresentation);
        } catch (SvensonRuntimeException e) {
            log.warn("Ignoring json parsing error on operation update - this is expected when operation JSON is invalid");
            log.warn(e.getMessage());
        }
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        log.info("Registering operation handlers... ");
        this.operationHandlers.clear();
        this.ctx.getBeansOfType(OperationHandler.class).forEach((str, operationHandler) -> {
            log.info("Adding operation handler: {}", str);
            this.operationHandlers.add(operationHandler);
        });
        log.info("Initializing operation executor with queue size: {}", Integer.valueOf(this.generalConfiguration.getOperationQueueSize()));
        this.pendingOperations = new ArrayBlockingQueue(this.generalConfiguration.getOperationQueueSize(), true);
    }

    private boolean isOperationInTheQueue(OperationRepresentation operationRepresentation) {
        if (this.pendingOperations.contains(operationRepresentation)) {
            return true;
        }
        return this.pendingOperations.stream().anyMatch(operationRepresentation2 -> {
            return operationRepresentation2.getId().equals(operationRepresentation.getId());
        });
    }

    private boolean isOperationOutdated(OperationRepresentation operationRepresentation) {
        DateTime creationDateTime = operationRepresentation.getCreationDateTime();
        return !Objects.isNull(creationDateTime) && creationDateTime.plusMinutes(1).isBeforeNow();
    }

    public ExpiringMap<String, DateTime> getExecutedOperationsCache() {
        return this.executedOperationsCache;
    }
}
