package com.hsrj.platform.starter.canal.client.transfer;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.hsrj.platform.starter.canal.annotation.ListenPoint;
import com.hsrj.platform.starter.canal.client.ListenerPoint;
import com.hsrj.platform.starter.canal.client.exception.CanalClientException;
import com.hsrj.platform.starter.canal.config.CanalConfig;
import com.hsrj.platform.starter.canal.event.CanalEventListener;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hsrj/platform/starter/canal/client/transfer/AbstractBasicMessageTransponder.class */
public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder {
    private static final Logger logger = LoggerFactory.getLogger(AbstractBasicMessageTransponder.class);

    public AbstractBasicMessageTransponder(CanalConnector canalConnector, Map.Entry<String, CanalConfig.Instance> entry, List<CanalEventListener> list, List<ListenerPoint> list2) {
        super(canalConnector, entry, list, list2);
    }

    @Override // com.hsrj.platform.starter.canal.client.transfer.AbstractMessageTransponder
    protected void distributeEvent(Message message) {
        for (CanalEntry.Entry entry : message.getEntries()) {
            List<CanalEntry.EntryType> ignoreEntryTypes = getIgnoreEntryTypes();
            if (ignoreEntryTypes == null || !ignoreEntryTypes.stream().anyMatch(entryType -> {
                return entry.getEntryType() == entryType;
            })) {
                try {
                    CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    if (parseFrom.hasIsDdl() && parseFrom.getIsDdl()) {
                        processDdl(parseFrom);
                    } else {
                        for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
                            distributeByImpl(entry.getHeader(), parseFrom.getEventType(), rowData);
                            distributeByAnnotation(this.destination, entry.getHeader(), parseFrom.getEventType(), rowData);
                        }
                    }
                } catch (Exception e) {
                    throw new CanalClientException("ERROR ## parser of event has an error , data:" + entry.toString(), e);
                }
            }
        }
    }

    protected void processDdl(CanalEntry.RowChange rowChange) {
    }

    protected void distributeByImpl(CanalEntry.Header header, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        if (this.listeners != null) {
            Iterator<CanalEventListener> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().onEvent(header, eventType, rowData);
            }
        }
    }

    protected void distributeByAnnotation(String str, CanalEntry.Header header, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        this.annoListeners.forEach(listenerPoint -> {
            listenerPoint.getInvokeMap().entrySet().stream().filter(getAnnotationFilter(str, header.getSchemaName(), header.getTableName(), eventType)).forEach(entry -> {
                Method method = (Method) entry.getKey();
                method.setAccessible(true);
                try {
                    method.invoke(listenerPoint.getTarget(), getInvokeArgs(method, header, eventType, rowData));
                } catch (Exception e) {
                    logger.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}", new Object[]{Thread.currentThread().getName(), listenerPoint.getTarget().getClass().getName(), method.getName()});
                }
            });
        });
    }

    protected abstract Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String str, String str2, String str3, CanalEntry.EventType eventType);

    protected abstract Object[] getInvokeArgs(Method method, CanalEntry.Header header, CanalEntry.EventType eventType, CanalEntry.RowData rowData);

    protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
        return Collections.emptyList();
    }
}
