001
014
015 package com.liferay.portal.resiliency.spi.agent;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
021 import com.liferay.portal.kernel.resiliency.spi.SPI;
022 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
023 import com.liferay.portal.kernel.resiliency.spi.agent.AcceptorServlet;
024 import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
025 import com.liferay.portal.kernel.servlet.BufferCacheServletResponse;
026 import com.liferay.portal.kernel.servlet.ReadOnlyServletResponse;
027 import com.liferay.portal.kernel.util.CharPool;
028 import com.liferay.portal.kernel.util.InetAddressUtil;
029 import com.liferay.portal.kernel.util.StringUtil;
030 import com.liferay.portal.kernel.util.WebKeys;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.io.DataInput;
034 import java.io.DataInputStream;
035 import java.io.File;
036 import java.io.IOException;
037 import java.io.InputStream;
038 import java.io.OutputStream;
039
040 import java.net.InetSocketAddress;
041 import java.net.Socket;
042 import java.net.SocketAddress;
043 import java.net.UnknownHostException;
044
045 import java.nio.charset.Charset;
046
047 import java.util.Iterator;
048 import java.util.concurrent.ArrayBlockingQueue;
049 import java.util.concurrent.BlockingQueue;
050
051 import javax.servlet.http.HttpServletRequest;
052 import javax.servlet.http.HttpServletResponse;
053
054
057 public class HttpClientSPIAgent implements SPIAgent {
058
059 public HttpClientSPIAgent(
060 SPIConfiguration spiConfiguration,
061 RegistrationReference registrationReference)
062 throws UnknownHostException {
063
064 this.registrationReference = registrationReference;
065
066 socketAddress = new InetSocketAddress(
067 InetAddressUtil.getLoopbackInetAddress(),
068 spiConfiguration.getConnectorPort());
069 socketBlockingQueue = new ArrayBlockingQueue<Socket>(
070 PropsValues.PORTAL_RESILIENCY_SPI_AGENT_CLIENT_POOL_MAX_SIZE);
071
072 String httpServletRequestContentString =
073 "POST " + SPI_AGENT_CONTEXT_PATH + MAPPING_PATTERN +
074 " HTTP/1.1\r\nHost: localhost:" +
075 spiConfiguration.getConnectorPort() + "\r\n" +
076 "Content-Length: 8\r\n\r\n";
077
078 httpServletRequestContent = httpServletRequestContentString.getBytes(
079 Charset.forName("US-ASCII"));
080 }
081
082 @Override
083 public void destroy() {
084 Iterator<Socket> iterator = socketBlockingQueue.iterator();
085
086 while (iterator.hasNext()) {
087 Socket socket = iterator.next();
088
089 iterator.remove();
090
091 try {
092 socket.close();
093 }
094 catch (IOException ioe) {
095 if (_log.isWarnEnabled()) {
096 _log.warn(ioe, ioe);
097 }
098 }
099 }
100 }
101
102 @Override
103 public void init(SPI spi) throws PortalResiliencyException {
104 try {
105 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
106
107 spi.addServlet(
108 SPI_AGENT_CONTEXT_PATH, spiConfiguration.getBaseDir(),
109 MAPPING_PATTERN, AcceptorServlet.class.getName());
110 }
111 catch (Exception e) {
112 throw new PortalResiliencyException(e);
113 }
114 }
115
116 @Override
117 public HttpServletRequest prepareRequest(HttpServletRequest request)
118 throws IOException {
119
120 SPIAgentRequest spiAgentRequest = SPIAgentRequest.readFrom(
121 request.getInputStream());
122
123 HttpServletRequest spiAgentHttpServletRequest =
124 spiAgentRequest.populateRequest(request);
125
126 spiAgentHttpServletRequest.setAttribute(
127 WebKeys.SPI_AGENT_REQUEST, spiAgentRequest);
128
129 return spiAgentHttpServletRequest;
130 }
131
132 @Override
133 public HttpServletResponse prepareResponse(
134 HttpServletRequest request, HttpServletResponse response) {
135
136 HttpServletResponse spiAgentHttpServletResponse =
137 new BufferCacheServletResponse(
138 new ReadOnlyServletResponse(response));
139
140 request.setAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE, response);
141 request.setAttribute(
142 WebKeys.SPI_AGENT_RESPONSE, new SPIAgentResponse());
143
144 return spiAgentHttpServletResponse;
145 }
146
147 @Override
148 public void service(
149 HttpServletRequest request, HttpServletResponse response)
150 throws PortalResiliencyException {
151
152 Socket socket = null;
153
154 try {
155 socket = borrowSocket();
156
157 SPIAgentRequest spiAgentRequest = new SPIAgentRequest(request);
158
159 OutputStream outputStream = socket.getOutputStream();
160
161 outputStream.write(httpServletRequestContent);
162
163 spiAgentRequest.writeTo(registrationReference, outputStream);
164
165 InputStream inputStream = socket.getInputStream();
166
167 DataInputStream dataInputStream = new DataInputStream(inputStream);
168
169 boolean forceCloseSocket = consumeHttpResponseHead(dataInputStream);
170
171 SPIAgentResponse spiAgentResponse = SPIAgentResponse.readFrom(
172 dataInputStream);
173
174 spiAgentResponse.populate(request, response);
175
176 returnSocket(socket, forceCloseSocket);
177
178 socket = null;
179 }
180 catch (IOException ioe) {
181 throw new PortalResiliencyException(ioe);
182 }
183 finally {
184 if (socket != null) {
185 try {
186 socket.close();
187 }
188 catch (IOException ioe) {
189 if (_log.isWarnEnabled()) {
190 _log.warn(ioe, ioe);
191 }
192 }
193 }
194 }
195 }
196
197 @Override
198 public void transferResponse(
199 HttpServletRequest request, HttpServletResponse response,
200 Exception exception)
201 throws IOException {
202
203 SPIAgentRequest spiAgentRequest = (SPIAgentRequest)request.getAttribute(
204 WebKeys.SPI_AGENT_REQUEST);
205
206 request.removeAttribute(WebKeys.SPI_AGENT_REQUEST);
207
208 File requestBodyFile = spiAgentRequest.requestBodyFile;
209
210 if (requestBodyFile != null) {
211 if (!requestBodyFile.delete()) {
212 requestBodyFile.deleteOnExit();
213 }
214 }
215
216 SPIAgentResponse spiAgentResponse =
217 (SPIAgentResponse)request.getAttribute(WebKeys.SPI_AGENT_RESPONSE);
218
219 request.removeAttribute(WebKeys.SPI_AGENT_RESPONSE);
220
221 if (exception != null) {
222 spiAgentResponse.setException(exception);
223 }
224 else {
225 BufferCacheServletResponse bufferCacheServletResponse =
226 (BufferCacheServletResponse)response;
227
228 spiAgentResponse.captureResponse(
229 request, bufferCacheServletResponse);
230 }
231
232 HttpServletResponse originalResponse =
233 (HttpServletResponse)request.getAttribute(
234 WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
235
236 request.removeAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
237
238 originalResponse.setContentLength(8);
239
240 spiAgentResponse.writeTo(
241 registrationReference, originalResponse.getOutputStream());
242 }
243
244 protected Socket borrowSocket() throws IOException {
245 Socket socket = socketBlockingQueue.poll();
246
247 if (socket != null) {
248 if (socket.isClosed() || !socket.isConnected() ||
249 socket.isInputShutdown() || socket.isOutputShutdown()) {
250
251 try {
252 socket.close();
253 }
254 catch (IOException ioe) {
255 if (_log.isWarnEnabled()) {
256 _log.warn(ioe, ioe);
257 }
258 }
259
260 socket = null;
261 }
262 }
263
264 if (socket == null) {
265 socket = new Socket();
266
267 socket.connect(socketAddress);
268 }
269
270 return socket;
271 }
272
273 protected boolean consumeHttpResponseHead(DataInput dataInput)
274 throws IOException {
275
276 String statusLine = dataInput.readLine();
277
278 if (!statusLine.equals("HTTP/1.1 200 OK")) {
279 throw new IOException("Error status line: " + statusLine);
280 }
281
282 boolean forceCloseSocket = false;
283
284 String line = null;
285
286 while (((line = dataInput.readLine()) != null) && (line.length() > 0)) {
287 String[] headerKeyValuePair = StringUtil.split(
288 line, CharPool.COLON);
289
290 String headerName = headerKeyValuePair[0].trim();
291
292 headerName = StringUtil.toLowerCase(headerName);
293
294 if (headerName.equals("connection")) {
295 String headerValue = headerKeyValuePair[1].trim();
296
297 headerValue = StringUtil.toLowerCase(headerValue);
298
299 if (headerValue.equals("close")) {
300 forceCloseSocket = true;
301 }
302 }
303 }
304
305 return forceCloseSocket;
306 }
307
308 protected void returnSocket(Socket socket, boolean forceCloseSocket) {
309 boolean pooled = false;
310
311 if (!forceCloseSocket && socket.isConnected() &&
312 !socket.isInputShutdown() && !socket.isOutputShutdown()) {
313
314 pooled = socketBlockingQueue.offer(socket);
315 }
316
317 if (!pooled) {
318 try {
319 socket.close();
320 }
321 catch (IOException ioe) {
322 if (_log.isWarnEnabled()) {
323 _log.warn(ioe, ioe);
324 }
325 }
326 }
327 }
328
329 protected static final String MAPPING_PATTERN = "/acceptor";
330
331 protected static final String SPI_AGENT_CONTEXT_PATH = "/spi_agent";
332
333 protected final byte[] httpServletRequestContent;
334 protected final RegistrationReference registrationReference;
335 protected final SocketAddress socketAddress;
336 protected final BlockingQueue<Socket> socketBlockingQueue;
337
338 private static Log _log = LogFactoryUtil.getLog(HttpClientSPIAgent.class);
339
340 }