001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.resiliency.spi.agent;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
021    import com.liferay.portal.kernel.resiliency.spi.SPI;
022    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
023    import com.liferay.portal.kernel.resiliency.spi.agent.AcceptorServlet;
024    import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
025    import com.liferay.portal.kernel.servlet.BufferCacheServletResponse;
026    import com.liferay.portal.kernel.servlet.ReadOnlyServletResponse;
027    import com.liferay.portal.kernel.util.CharPool;
028    import com.liferay.portal.kernel.util.InetAddressUtil;
029    import com.liferay.portal.kernel.util.StringUtil;
030    import com.liferay.portal.kernel.util.WebKeys;
031    import com.liferay.portal.util.PropsValues;
032    
033    import java.io.DataInput;
034    import java.io.DataInputStream;
035    import java.io.File;
036    import java.io.IOException;
037    import java.io.InputStream;
038    import java.io.OutputStream;
039    
040    import java.net.InetSocketAddress;
041    import java.net.Socket;
042    import java.net.SocketAddress;
043    import java.net.UnknownHostException;
044    
045    import java.nio.charset.Charset;
046    
047    import java.util.Iterator;
048    import java.util.concurrent.ArrayBlockingQueue;
049    import java.util.concurrent.BlockingQueue;
050    
051    import javax.servlet.http.HttpServletRequest;
052    import javax.servlet.http.HttpServletResponse;
053    
054    /**
055     * @author Shuyang Zhou
056     */
057    public class HttpClientSPIAgent implements SPIAgent {
058    
059            public HttpClientSPIAgent(
060                            SPIConfiguration spiConfiguration,
061                            RegistrationReference registrationReference)
062                    throws UnknownHostException {
063    
064                    this.registrationReference = registrationReference;
065    
066                    socketAddress = new InetSocketAddress(
067                            InetAddressUtil.getLoopbackInetAddress(),
068                            spiConfiguration.getConnectorPort());
069                    socketBlockingQueue = new ArrayBlockingQueue<Socket>(
070                            PropsValues.PORTAL_RESILIENCY_SPI_AGENT_CLIENT_POOL_MAX_SIZE);
071    
072                    String httpServletRequestContentString =
073                            "POST " + SPI_AGENT_CONTEXT_PATH + MAPPING_PATTERN +
074                                    " HTTP/1.1\r\nHost: localhost:" +
075                                            spiConfiguration.getConnectorPort() + "\r\n" +
076                                                    "Content-Length: 8\r\n\r\n";
077    
078                    httpServletRequestContent = httpServletRequestContentString.getBytes(
079                            Charset.forName("US-ASCII"));
080            }
081    
082            @Override
083            public void destroy() {
084                    Iterator<Socket> iterator = socketBlockingQueue.iterator();
085    
086                    while (iterator.hasNext()) {
087                            Socket socket = iterator.next();
088    
089                            iterator.remove();
090    
091                            try {
092                                    socket.close();
093                            }
094                            catch (IOException ioe) {
095                                    if (_log.isWarnEnabled()) {
096                                            _log.warn(ioe, ioe);
097                                    }
098                            }
099                    }
100            }
101    
102            @Override
103            public void init(SPI spi) throws PortalResiliencyException {
104                    try {
105                            SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
106    
107                            spi.addServlet(
108                                    SPI_AGENT_CONTEXT_PATH, spiConfiguration.getBaseDir(),
109                                    MAPPING_PATTERN, AcceptorServlet.class.getName());
110                    }
111                    catch (Exception e) {
112                            throw new PortalResiliencyException(e);
113                    }
114            }
115    
116            @Override
117            public HttpServletRequest prepareRequest(HttpServletRequest request)
118                    throws IOException {
119    
120                    SPIAgentRequest spiAgentRequest = SPIAgentRequest.readFrom(
121                            request.getInputStream());
122    
123                    HttpServletRequest spiAgentHttpServletRequest =
124                            spiAgentRequest.populateRequest(request);
125    
126                    spiAgentHttpServletRequest.setAttribute(
127                            WebKeys.SPI_AGENT_REQUEST, spiAgentRequest);
128    
129                    return spiAgentHttpServletRequest;
130            }
131    
132            @Override
133            public HttpServletResponse prepareResponse(
134                    HttpServletRequest request, HttpServletResponse response) {
135    
136                    HttpServletResponse spiAgentHttpServletResponse =
137                            new BufferCacheServletResponse(
138                                    new ReadOnlyServletResponse(response));
139    
140                    request.setAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE, response);
141                    request.setAttribute(
142                            WebKeys.SPI_AGENT_RESPONSE, new SPIAgentResponse());
143    
144                    return spiAgentHttpServletResponse;
145            }
146    
147            @Override
148            public void service(
149                            HttpServletRequest request, HttpServletResponse response)
150                    throws PortalResiliencyException {
151    
152                    Socket socket = null;
153    
154                    try {
155                            socket = borrowSocket();
156    
157                            SPIAgentRequest spiAgentRequest = new SPIAgentRequest(request);
158    
159                            OutputStream outputStream = socket.getOutputStream();
160    
161                            outputStream.write(httpServletRequestContent);
162    
163                            spiAgentRequest.writeTo(registrationReference, outputStream);
164    
165                            InputStream inputStream = socket.getInputStream();
166    
167                            DataInputStream dataInputStream = new DataInputStream(inputStream);
168    
169                            boolean forceCloseSocket = consumeHttpResponseHead(dataInputStream);
170    
171                            SPIAgentResponse spiAgentResponse = SPIAgentResponse.readFrom(
172                                    dataInputStream);
173    
174                            spiAgentResponse.populate(request, response);
175    
176                            returnSocket(socket, forceCloseSocket);
177    
178                            socket = null;
179                    }
180                    catch (IOException ioe) {
181                            throw new PortalResiliencyException(ioe);
182                    }
183                    finally {
184                            if (socket != null) {
185                                    try {
186                                            socket.close();
187                                    }
188                                    catch (IOException ioe) {
189                                            if (_log.isWarnEnabled()) {
190                                                    _log.warn(ioe, ioe);
191                                            }
192                                    }
193                            }
194                    }
195            }
196    
197            @Override
198            public void transferResponse(
199                            HttpServletRequest request, HttpServletResponse response,
200                            Exception exception)
201                    throws IOException {
202    
203                    SPIAgentRequest spiAgentRequest = (SPIAgentRequest)request.getAttribute(
204                            WebKeys.SPI_AGENT_REQUEST);
205    
206                    request.removeAttribute(WebKeys.SPI_AGENT_REQUEST);
207    
208                    File requestBodyFile = spiAgentRequest.requestBodyFile;
209    
210                    if (requestBodyFile != null) {
211                            if (!requestBodyFile.delete()) {
212                                    requestBodyFile.deleteOnExit();
213                            }
214                    }
215    
216                    SPIAgentResponse spiAgentResponse =
217                            (SPIAgentResponse)request.getAttribute(WebKeys.SPI_AGENT_RESPONSE);
218    
219                    request.removeAttribute(WebKeys.SPI_AGENT_RESPONSE);
220    
221                    if (exception != null) {
222                            spiAgentResponse.setException(exception);
223                    }
224                    else {
225                            BufferCacheServletResponse bufferCacheServletResponse =
226                                    (BufferCacheServletResponse)response;
227    
228                            spiAgentResponse.captureResponse(
229                                    request, bufferCacheServletResponse);
230                    }
231    
232                    HttpServletResponse originalResponse =
233                            (HttpServletResponse)request.getAttribute(
234                                    WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
235    
236                    request.removeAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
237    
238                    originalResponse.setContentLength(8);
239    
240                    spiAgentResponse.writeTo(
241                            registrationReference, originalResponse.getOutputStream());
242            }
243    
244            protected Socket borrowSocket() throws IOException {
245                    Socket socket = socketBlockingQueue.poll();
246    
247                    if (socket != null) {
248                            if (socket.isClosed() || !socket.isConnected() ||
249                                    socket.isInputShutdown() || socket.isOutputShutdown()) {
250    
251                                    try {
252                                            socket.close();
253                                    }
254                                    catch (IOException ioe) {
255                                            if (_log.isWarnEnabled()) {
256                                                    _log.warn(ioe, ioe);
257                                            }
258                                    }
259    
260                                    socket = null;
261                            }
262                    }
263    
264                    if (socket == null) {
265                            socket = new Socket();
266    
267                            socket.connect(socketAddress);
268                    }
269    
270                    return socket;
271            }
272    
273            protected boolean consumeHttpResponseHead(DataInput dataInput)
274                    throws IOException {
275    
276                    String statusLine = dataInput.readLine();
277    
278                    if (!statusLine.equals("HTTP/1.1 200 OK")) {
279                            throw new IOException("Error status line: " + statusLine);
280                    }
281    
282                    boolean forceCloseSocket = false;
283    
284                    String line = null;
285    
286                    while (((line = dataInput.readLine()) != null) && (line.length() > 0)) {
287                            String[] headerKeyValuePair = StringUtil.split(
288                                    line, CharPool.COLON);
289    
290                            String headerName = headerKeyValuePair[0].trim();
291    
292                            headerName = StringUtil.toLowerCase(headerName);
293    
294                            if (headerName.equals("connection")) {
295                                    String headerValue = headerKeyValuePair[1].trim();
296    
297                                    headerValue = StringUtil.toLowerCase(headerValue);
298    
299                                    if (headerValue.equals("close")) {
300                                            forceCloseSocket = true;
301                                    }
302                            }
303                    }
304    
305                    return forceCloseSocket;
306            }
307    
308            protected void returnSocket(Socket socket, boolean forceCloseSocket) {
309                    boolean pooled = false;
310    
311                    if (!forceCloseSocket && socket.isConnected() &&
312                            !socket.isInputShutdown() && !socket.isOutputShutdown()) {
313    
314                            pooled = socketBlockingQueue.offer(socket);
315                    }
316    
317                    if (!pooled) {
318                            try {
319                                    socket.close();
320                            }
321                            catch (IOException ioe) {
322                                    if (_log.isWarnEnabled()) {
323                                            _log.warn(ioe, ioe);
324                                    }
325                            }
326                    }
327            }
328    
329            protected static final String MAPPING_PATTERN = "/acceptor";
330    
331            protected static final String SPI_AGENT_CONTEXT_PATH = "/spi_agent";
332    
333            protected final byte[] httpServletRequestContent;
334            protected final RegistrationReference registrationReference;
335            protected final SocketAddress socketAddress;
336            protected final BlockingQueue<Socket> socketBlockingQueue;
337    
338            private static Log _log = LogFactoryUtil.getLog(HttpClientSPIAgent.class);
339    
340    }