WeixinMessageDispatcher.java

package com.foxinmy.weixin4j.dispatcher;

import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;

import java.io.ByteArrayInputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;

import com.foxinmy.weixin4j.handler.WeixinMessageHandler;
import com.foxinmy.weixin4j.interceptor.WeixinMessageInterceptor;
import com.foxinmy.weixin4j.request.WeixinMessage;
import com.foxinmy.weixin4j.request.WeixinRequest;
import com.foxinmy.weixin4j.response.BlankResponse;
import com.foxinmy.weixin4j.response.WeixinResponse;
import com.foxinmy.weixin4j.socket.WeixinMessageTransfer;
import com.foxinmy.weixin4j.util.ClassUtil;
import com.foxinmy.weixin4j.util.HttpUtil;
import com.foxinmy.weixin4j.util.ServerToolkits;
import com.foxinmy.weixin4j.xml.MessageTransferHandler;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
 * 微信消息分发器
 *
 * @className WeixinMessageDispatcher
 * @author jinyu(foxinmy@gmail.com)
 * @date 2015年5月7日
 * @since JDK 1.6
 * @see com.foxinmy.weixin4j.handler.WeixinMessageHandler
 * @see com.foxinmy.weixin4j.interceptor.WeixinMessageInterceptor
 * @see com.foxinmy.weixin4j.dispatcher.WeixinMessageMatcher
 * @see com.foxinmy.weixin4j.dispatcher.MessageHandlerExecutor
 * @see com.foxinmy.weixin4j.dispatcher.BeanFactory
 */
public class WeixinMessageDispatcher {

    private final InternalLogger logger = InternalLoggerFactory.getInstance(getClass());

    /**
     * 消息处理器
     */
    private List<WeixinMessageHandler> messageHandlerList;
    private WeixinMessageHandler[] messageHandlers;
    /**
     * 消息处理器所在的包
     */
    private String[] messageHandlerPackages;

    /**
     * 消息拦截器
     */
    private List<WeixinMessageInterceptor> messageInterceptorList;
    private WeixinMessageInterceptor[] messageInterceptors;
    /**
     * 消息拦截器所在的包
     */
    private String[] messageInterceptorPackages;

    /**
     * Bean构造
     */
    private BeanFactory beanFactory;

    /**
     * 消息匹配
     */
    private WeixinMessageMatcher messageMatcher;
    /**
     * 消息转换
     */
    private Map<Class<? extends WeixinMessage>, Unmarshaller> messageUnmarshaller;
    /**
     * 是否总是响应请求,如未匹配到MessageHandler时回复空白消息
     */
    private boolean alwaysResponse;

    public WeixinMessageDispatcher() {
        this(new DefaultMessageMatcher());
    }

    public WeixinMessageDispatcher(WeixinMessageMatcher messageMatcher) {
        this.messageMatcher = messageMatcher;
        this.messageUnmarshaller = new ConcurrentHashMap<Class<? extends WeixinMessage>, Unmarshaller>();
    }

    /**
     * 对消息进行一系列的处理,包括 拦截、匹配、分发等动作
     *
     * @param context
     *            上下文环境
     * @param request
     *            微信请求
     * @param messageTransfer
     *            微信消息 @
     */
    public void doDispatch(final ChannelHandlerContext context, final WeixinRequest request) {
        WeixinMessageTransfer messageTransfer = MessageTransferHandler.parser(request);
        context.channel().attr(ServerToolkits.MESSAGE_TRANSFER_KEY).set(messageTransfer);
        WeixinMessageKey messageKey = defineMessageKey(messageTransfer, request);
        Class<? extends WeixinMessage> targetClass = messageMatcher.match(messageKey);
        WeixinMessage message = messageRead(request.getOriginalContent(), targetClass);
        logger.info("define '{}' matched '{}'", messageKey, targetClass);
        MessageHandlerExecutor handlerExecutor = getHandlerExecutor(context, request, messageKey, message,
                messageTransfer.getNodeNames());
        if (handlerExecutor == null || handlerExecutor.getMessageHandler() == null) {
            noHandlerFound(context, request, message);
            return;
        }
        if (!handlerExecutor.applyPreHandle(request, message)) {
            return;
        }
        Exception exception = null;
        WeixinResponse response = null;
        try {
            response = handlerExecutor.getMessageHandler().doHandle(request, message);
            handlerExecutor.applyPostHandle(request, response, message);
            context.writeAndFlush(response);
        } catch (Exception e) {
            exception = e;
        }
        handlerExecutor.triggerAfterCompletion(request, response, message, exception);
    }

    /**
     * 声明messagekey
     *
     * @param messageTransfer
     *            基础消息
     * @param request
     *            请求信息
     * @return
     */
    protected WeixinMessageKey defineMessageKey(WeixinMessageTransfer messageTransfer, WeixinRequest request) {
        return new WeixinMessageKey(messageTransfer.getMsgType(), messageTransfer.getEventType(),
                messageTransfer.getAccountType());
    }

    /**
     * 未匹配到handler时触发
     *
     * @param context
     *            上下文环境
     * @param request
     *            微信请求
     * @param message
     *            微信消息
     */
    protected void noHandlerFound(ChannelHandlerContext context, WeixinRequest request, WeixinMessage message) {
        logger.warn("no handler found for {}", request);
        if (alwaysResponse) {
            context.write(BlankResponse.global);
        } else {
            FullHttpResponse response = new DefaultFullHttpResponse(request.getProtocolVersion(), NOT_FOUND);
            HttpUtil.resolveHeaders(response);
            context.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     * MessageHandlerExecutor
     *
     * @param context
     *            上下文环境
     * @param request
     *            微信请求
     * @param messageKey
     *            消息的key
     * @param message
     *            微信消息
     * @param nodeNames
     *            节点名称集合
     * @return MessageHandlerExecutor
     * @see MessageHandlerExecutor @
     */
    protected MessageHandlerExecutor getHandlerExecutor(ChannelHandlerContext context, WeixinRequest request,
            WeixinMessageKey messageKey, WeixinMessage message, Set<String> nodeNames) {
        WeixinMessageHandler[] messageHandlers = getMessageHandlers();
        if (messageHandlers == null) {
            return null;
        }
        logger.info("resolve message handlers '{}'", this.messageHandlerList);
        List<WeixinMessageHandler> matchedMessageHandlers = new ArrayList<WeixinMessageHandler>();
        for (WeixinMessageHandler handler : messageHandlers) {
            if (handler.canHandle(request, message, nodeNames)) {
                matchedMessageHandlers.add(handler);
            }
        }
        if (matchedMessageHandlers.isEmpty()) {
            return null;
        }
        Collections.sort(matchedMessageHandlers, new Comparator<WeixinMessageHandler>() {
            @Override
            public int compare(WeixinMessageHandler m1, WeixinMessageHandler m2) {
                return m2.weight() - m1.weight();
            }
        });
        logger.info("matched message handlers '{}'", matchedMessageHandlers);
        return new MessageHandlerExecutor(context, matchedMessageHandlers.get(0), getMessageInterceptors());
    }

    /**
     * 获取所有的handler
     *
     * @return handler集合
     * @see com.foxinmy.weixin4j.handler.WeixinMessageHandler @
     */
    public WeixinMessageHandler[] getMessageHandlers() {
        if (this.messageHandlers == null) {
            if (messageHandlerPackages != null) {
                List<Class<?>> messageHandlerClass = new ArrayList<Class<?>>();
                for (String packageName : messageHandlerPackages) {
                    messageHandlerClass.addAll(ClassUtil.getClasses(packageName));
                }
                if (beanFactory != null) {
                    for (Class<?> clazz : messageHandlerClass) {
                        if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())
                                || !WeixinMessageHandler.class.isAssignableFrom(clazz)) {
                            continue;
                        }
                        try {
                            messageHandlerList.add((WeixinMessageHandler) beanFactory.getBean(clazz));
                        } catch (RuntimeException ex) { // multiple
                            for (Object o : beanFactory.getBeans(clazz).values()) {
                                if (o.getClass() == clazz) {
                                    messageHandlerList.add((WeixinMessageHandler) o);
                                    break;
                                }
                            }
                        }
                    }
                } else {
                    for (Class<?> clazz : messageHandlerClass) {
                        if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())
                                || !WeixinMessageHandler.class.isAssignableFrom(clazz)) {
                            continue;
                        }
                        try {
                            Constructor<?> ctor = clazz.getDeclaredConstructor();
                            ServerToolkits.makeConstructorAccessible(ctor);
                            messageHandlerList.add((WeixinMessageHandler) ctor.newInstance((Object[]) null));
                        } catch (Exception ex) {
                            throw new RuntimeException(clazz.getName() + " instantiate fail", ex);
                        }
                    }
                }
            }
            if (messageHandlerList != null && !this.messageHandlerList.isEmpty()) {
                this.messageHandlers = this.messageHandlerList
                        .toArray(new WeixinMessageHandler[this.messageHandlerList.size()]);
            }
        }
        return this.messageHandlers;
    }

    /**
     * 获取所有的interceptor
     *
     * @return interceptor集合
     * @ @see com.foxinmy.weixin4j.interceptor.WeixinMessageInterceptor
     */
    public WeixinMessageInterceptor[] getMessageInterceptors() {
        if (this.messageInterceptors == null) {
            if (this.messageInterceptorPackages != null) {
                List<Class<?>> messageInterceptorClass = new ArrayList<Class<?>>();
                for (String packageName : messageInterceptorPackages) {
                    messageInterceptorClass.addAll(ClassUtil.getClasses(packageName));
                }
                if (beanFactory != null) {
                    for (Class<?> clazz : messageInterceptorClass) {
                        if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())
                                || !WeixinMessageInterceptor.class.isAssignableFrom(clazz)) {
                            continue;
                        }
                        try {
                            messageInterceptorList.add((WeixinMessageInterceptor) beanFactory.getBean(clazz));
                        } catch (RuntimeException ex) { // multiple
                            for (Object o : beanFactory.getBeans(clazz).values()) {
                                if (o.getClass() == clazz) {
                                    messageInterceptorList.add((WeixinMessageInterceptor) o);
                                    break;
                                }
                            }
                        }
                    }
                } else {
                    for (Class<?> clazz : messageInterceptorClass) {
                        if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())
                                || !WeixinMessageInterceptor.class.isAssignableFrom(clazz)) {
                            continue;
                        }
                        try {
                            Constructor<?> ctor = clazz.getDeclaredConstructor();
                            ServerToolkits.makeConstructorAccessible(ctor);
                            messageInterceptorList.add((WeixinMessageInterceptor) ctor.newInstance((Object[]) null));
                        } catch (Exception ex) {
                            throw new RuntimeException(clazz.getName() + " instantiate fail", ex);
                        }
                    }
                }
            }
            if (this.messageInterceptorList != null && !this.messageInterceptorList.isEmpty()) {
                Collections.sort(messageInterceptorList, new Comparator<WeixinMessageInterceptor>() {
                    @Override
                    public int compare(WeixinMessageInterceptor m1, WeixinMessageInterceptor m2) {
                        return m2.weight() - m1.weight();
                    }
                });
                this.messageInterceptors = this.messageInterceptorList
                        .toArray(new WeixinMessageInterceptor[this.messageInterceptorList.size()]);
            }
        }
        logger.info("resolve message interceptors '{}'", this.messageInterceptorList);
        return this.messageInterceptors;
    }

    /**
     * jaxb读取微信消息
     *
     * @param message
     *            xml消息
     * @param clazz
     *            消息类型
     * @return 消息对象 @
     */
    protected <M extends WeixinMessage> M messageRead(String message, Class<M> clazz) {
        if (clazz == null) {
            return null;
        }
        try {
            Source source = new StreamSource(new ByteArrayInputStream(ServerToolkits.getBytesUtf8(message)));
            JAXBElement<M> jaxbElement = getUnmarshaller(clazz).unmarshal(source, clazz);
            return jaxbElement.getValue();
        } catch (JAXBException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * xml消息转换器
     *
     * @param clazz
     *            消息类型
     * @return 消息转换器 @
     */
    protected Unmarshaller getUnmarshaller(Class<? extends WeixinMessage> clazz) {
        Unmarshaller unmarshaller = messageUnmarshaller.get(clazz);
        if (unmarshaller == null) {
            try {
                JAXBContext jaxbContext = JAXBContext.newInstance(clazz);
                unmarshaller = jaxbContext.createUnmarshaller();
                messageUnmarshaller.put(clazz, unmarshaller);
            } catch (JAXBException e) {
                throw new RuntimeException(e);
            }
        }
        return unmarshaller;
    }

    public void setMessageHandlerList(List<WeixinMessageHandler> messageHandlerList) {
        this.messageHandlerList = messageHandlerList;
    }

    public void setMessageInterceptorList(List<WeixinMessageInterceptor> messageInterceptorList) {
        this.messageInterceptorList = messageInterceptorList;
    }

    public String[] getMessageHandlerPackages() {
        return messageHandlerPackages;
    }

    public String[] getMessageInterceptorPackages() {
        return messageInterceptorPackages;
    }

    public void setMessageHandlerPackages(String... messageHandlerPackages) {
        this.messageHandlerPackages = messageHandlerPackages;
    }

    public void setMessageInterceptorPackages(String... messageInterceptorPackages) {
        this.messageInterceptorPackages = messageInterceptorPackages;
    }

    public BeanFactory getBeanFactory() {
        return beanFactory;
    }

    public void setBeanFactory(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
    }

    public void registMessageClass(WeixinMessageKey messageKey, Class<? extends WeixinMessage> messageClass) {
        messageMatcher.regist(messageKey, messageClass);
    }

    public WeixinMessageMatcher getMessageMatcher() {
        return this.messageMatcher;
    }

    /**
     * 打开总是响应开关,如未匹配到MessageHandler时回复空白消息
     */
    public void openAlwaysResponse() {
        this.alwaysResponse = true;
    }
}