package com.hsrj.platform.starter.canal.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.hsrj.platform.starter.canal.annotation.ListenPoint;
import com.hsrj.platform.starter.canal.client.transfer.TransponderFactory;
import com.hsrj.platform.starter.canal.config.CanalConfig;
import com.hsrj.platform.starter.canal.event.CanalEventListener;
import com.hsrj.platform.starter.canal.util.BeanUtil;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;

/* loaded from: input_file:com/hsrj/platform/starter/canal/client/SimpleCanalClient.class */
public class SimpleCanalClient extends AbstractCanalClient {
    private ThreadPoolExecutor executor;
    private final List<CanalEventListener> listeners;
    private final List<ListenerPoint> annoListeners;
    private static final Logger logger = LoggerFactory.getLogger(SimpleCanalClient.class);

    public SimpleCanalClient(CanalConfig canalConfig, TransponderFactory transponderFactory) {
        super(canalConfig, transponderFactory);
        this.listeners = new ArrayList();
        this.annoListeners = new ArrayList();
        this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Executors.defaultThreadFactory());
        initListeners();
    }

    @Override // com.hsrj.platform.starter.canal.client.AbstractCanalClient
    protected void process(CanalConnector canalConnector, Map.Entry<String, CanalConfig.Instance> entry) {
        this.executor.submit(this.factory.newTransponder(canalConnector, entry, this.listeners, this.annoListeners));
    }

    @Override // com.hsrj.platform.starter.canal.client.AbstractCanalClient, com.hsrj.platform.starter.canal.client.CanalClient
    public void stop() {
        super.stop();
        this.executor.shutdown();
    }

    private void initListeners() {
        logger.info("{}: initializing the listeners....", Thread.currentThread().getName());
        List beansOfType = BeanUtil.getBeansOfType(CanalEventListener.class);
        if (beansOfType != null) {
            this.listeners.addAll(beansOfType);
        }
        Map<String, Object> beansWithAnnotation = BeanUtil.getBeansWithAnnotation(com.hsrj.platform.starter.canal.annotation.CanalEventListener.class);
        if (beansWithAnnotation != null) {
            for (Object obj : beansWithAnnotation.values()) {
                Method[] declaredMethods = obj.getClass().getDeclaredMethods();
                if (declaredMethods != null && declaredMethods.length > 0) {
                    for (Method method : declaredMethods) {
                        ListenPoint listenPoint = (ListenPoint) AnnotationUtils.findAnnotation(method, ListenPoint.class);
                        if (listenPoint != null) {
                            this.annoListeners.add(new ListenerPoint(obj, method, listenPoint));
                        }
                    }
                }
            }
        }
        logger.info("{}: initializing the listeners end.", Thread.currentThread().getName());
        if (logger.isWarnEnabled() && this.listeners.isEmpty() && this.annoListeners.isEmpty()) {
            logger.warn("{}: No listener found in context! ", Thread.currentThread().getName());
        }
    }
}
