1 package com.foxinmy.weixin4j.http.support.netty;
2
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.buffer.ByteBuf;
5 import io.netty.buffer.ByteBufAllocator;
6 import io.netty.buffer.ByteBufOutputStream;
7 import io.netty.channel.Channel;
8 import io.netty.channel.ChannelFuture;
9 import io.netty.channel.ChannelFutureListener;
10 import io.netty.channel.ChannelHandlerContext;
11 import io.netty.channel.SimpleChannelInboundHandler;
12 import io.netty.handler.codec.http.DefaultFullHttpRequest;
13 import io.netty.handler.codec.http.DefaultHttpRequest;
14 import io.netty.handler.codec.http.FullHttpResponse;
15 import io.netty.handler.codec.http.HttpMethod;
16 import io.netty.handler.codec.http.HttpVersion;
17 import io.netty.handler.ssl.SslHandler;
18
19 import java.io.IOException;
20 import java.net.InetAddress;
21 import java.net.InetSocketAddress;
22 import java.net.URI;
23 import java.util.List;
24 import java.util.Map.Entry;
25 import java.util.concurrent.ExecutionException;
26
27 import javax.net.ssl.SSLContext;
28 import javax.net.ssl.SSLEngine;
29
30 import com.foxinmy.weixin4j.http.AbstractHttpClient;
31 import com.foxinmy.weixin4j.http.HttpClientException;
32 import com.foxinmy.weixin4j.http.HttpHeaders;
33 import com.foxinmy.weixin4j.http.HttpParams;
34 import com.foxinmy.weixin4j.http.HttpRequest;
35 import com.foxinmy.weixin4j.http.HttpResponse;
36 import com.foxinmy.weixin4j.http.entity.HttpEntity;
37 import com.foxinmy.weixin4j.http.factory.HttpClientFactory;
38 import com.foxinmy.weixin4j.util.Consts;
39 import com.foxinmy.weixin4j.util.SettableFuture;
40 import com.foxinmy.weixin4j.util.StringUtil;
41
42
43
44
45
46
47
48
49
50
51 public class Netty4HttpClient extends AbstractHttpClient {
52
53 private final Bootstrap bootstrap;
54 private final HttpParams params;
55
56 public Netty4HttpClient(Bootstrap bootstrap, HttpParams params) {
57 this.bootstrap = bootstrap;
58 this.params = params;
59 }
60
61 @Override
62 public HttpResponse execute(final HttpRequest request)
63 throws HttpClientException {
64 HttpResponse response = null;
65 try {
66 final URI uri = request.getURI();
67 final SettableFuture<HttpResponse> future = new SettableFuture<HttpResponse>();
68 ChannelFutureListener listener = new ChannelFutureListener() {
69 @Override
70 public void operationComplete(ChannelFuture channelFuture)
71 throws Exception {
72 if (channelFuture.isSuccess()) {
73 Channel channel = channelFuture.channel();
74 if ("https".equals(uri.getScheme())) {
75 SSLContext sslContext;
76 if (params != null
77 && params.getSSLContext() != null) {
78 sslContext = params.getSSLContext();
79 } else {
80 sslContext = HttpClientFactory
81 .allowSSLContext();
82 }
83 SSLEngine sslEngine = sslContext.createSSLEngine();
84 sslEngine.setUseClientMode(true);
85 channel.pipeline().addFirst(
86 new SslHandler(sslEngine));
87 }
88 channel.pipeline().addLast(new RequestHandler(future));
89 DefaultHttpRequest uriRequest = createRequest(request);
90 channel.writeAndFlush(uriRequest);
91 } else {
92 future.setException(channelFuture.cause());
93 }
94 }
95 };
96 InetSocketAddress address = params != null
97 && params.getProxy() != null ? (InetSocketAddress) params
98 .getProxy().address() : new InetSocketAddress(
99 InetAddress.getByName(uri.getHost()), getPort(uri));
100 bootstrap.connect(address).addListener(listener);
101 response = future.get();
102 handleResponse(response);
103 } catch (IOException e) {
104 throw new HttpClientException("I/O error on "
105 + request.getMethod().name() + " request for \""
106 + request.getURI().toString(), e);
107 } catch (InterruptedException e) {
108 throw new HttpClientException("Execute error on "
109 + request.getMethod().name() + " request for \""
110 + request.getURI().toString(), e);
111 } catch (ExecutionException e) {
112 throw new HttpClientException("Execute error on "
113 + request.getMethod().name() + " request for \""
114 + request.getURI().toString(), e);
115 } finally {
116 if (response != null) {
117 response.close();
118 }
119 }
120 return response;
121 }
122
123 private DefaultHttpRequest createRequest(HttpRequest request)
124 throws IOException {
125 HttpMethod method = HttpMethod.valueOf(request.getMethod().name());
126 URI uri = request.getURI();
127 String url = StringUtil.isBlank(uri.getRawPath()) ? "/" : uri
128 .getRawPath();
129 if (StringUtil.isNotBlank(uri.getRawQuery())) {
130 url += "?" + uri.getRawQuery();
131 }
132 DefaultHttpRequest uriRequest = new DefaultHttpRequest(
133 HttpVersion.HTTP_1_1, method, url);
134
135 HttpEntity entity = request.getEntity();
136 if (entity != null) {
137 ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
138 ByteBufOutputStream out = new ByteBufOutputStream(byteBuf);
139 entity.writeTo(out);
140 out.flush();
141 out.close();
142 uriRequest = new DefaultFullHttpRequest(
143 uriRequest.getProtocolVersion(), uriRequest.getMethod(),
144 uriRequest.getUri(), byteBuf);
145 if (entity.getContentType() != null) {
146 uriRequest.headers().add(HttpHeaders.CONTENT_TYPE,
147 entity.getContentType().toString());
148 }
149 if (entity.getContentLength() < 0) {
150 uriRequest.headers().add(HttpHeaders.TRANSFER_ENCODING,
151 io.netty.handler.codec.http.HttpHeaders.Values.CHUNKED);
152 } else {
153 uriRequest.headers().add(HttpHeaders.CONTENT_LENGTH,
154 entity.getContentLength());
155 }
156 }
157
158 HttpHeaders headers = request.getHeaders();
159 if (headers == null) {
160 headers = new HttpHeaders();
161 }
162 if (!headers.containsKey(HttpHeaders.HOST)) {
163 headers.set(HttpHeaders.HOST, uri.getHost());
164 }
165
166 if (!headers.containsKey(HttpHeaders.ACCEPT)) {
167 headers.set(HttpHeaders.ACCEPT, "*/*");
168 }
169
170 if (!headers.containsKey(HttpHeaders.USER_AGENT)) {
171 headers.set(HttpHeaders.USER_AGENT, "netty/httpclient");
172 }
173 for (Entry<String, List<String>> header : headers.entrySet()) {
174 uriRequest.headers().set(header.getKey(), header.getValue());
175 }
176 uriRequest.headers().set(HttpHeaders.ACCEPT_CHARSET,
177 Consts.UTF_8.displayName());
178 uriRequest.headers().set(HttpHeaders.CONNECTION,
179 io.netty.handler.codec.http.HttpHeaders.Values.CLOSE);
180 return uriRequest;
181 }
182
183 private int getPort(URI uri) {
184 int port = uri.getPort();
185 if (port == -1) {
186 if ("http".equalsIgnoreCase(uri.getScheme())) {
187 port = 80;
188 } else if ("https".equalsIgnoreCase(uri.getScheme())) {
189 port = 443;
190 }
191 }
192 return port;
193 }
194
195 private static class RequestHandler extends
196 SimpleChannelInboundHandler<FullHttpResponse> {
197
198 private final SettableFuture<HttpResponse> future;
199
200 public RequestHandler(SettableFuture<HttpResponse> future) {
201 this.future = future;
202 }
203
204 @Override
205 protected void channelRead0(ChannelHandlerContext context,
206 FullHttpResponse response) throws Exception {
207 byte[] content = null;
208 ByteBuf byteBuf = response.content();
209 if (byteBuf.hasArray()) {
210 content = byteBuf.array();
211 } else {
212 content = new byte[byteBuf.readableBytes()];
213 byteBuf.readBytes(content);
214 }
215 future.set(new Netty4HttpResponse(context, response, content));
216 }
217
218 @Override
219 public void exceptionCaught(ChannelHandlerContext context,
220 Throwable cause) throws Exception {
221 future.setException(cause);
222 }
223 }
224 }