001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.resiliency.spi.agent;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
021    import com.liferay.portal.kernel.resiliency.spi.SPI;
022    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
023    import com.liferay.portal.kernel.resiliency.spi.agent.AcceptorServlet;
024    import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
025    import com.liferay.portal.kernel.servlet.BufferCacheServletResponse;
026    import com.liferay.portal.kernel.servlet.ReadOnlyServletResponse;
027    import com.liferay.portal.kernel.util.CharPool;
028    import com.liferay.portal.kernel.util.InetAddressUtil;
029    import com.liferay.portal.kernel.util.StringUtil;
030    import com.liferay.portal.kernel.util.WebKeys;
031    import com.liferay.portal.util.PropsValues;
032    
033    import java.io.DataInput;
034    import java.io.DataInputStream;
035    import java.io.IOException;
036    import java.io.InputStream;
037    import java.io.OutputStream;
038    
039    import java.net.InetSocketAddress;
040    import java.net.Socket;
041    import java.net.SocketAddress;
042    import java.net.UnknownHostException;
043    
044    import java.nio.charset.Charset;
045    
046    import java.util.Iterator;
047    import java.util.concurrent.ArrayBlockingQueue;
048    import java.util.concurrent.BlockingQueue;
049    
050    import javax.servlet.http.HttpServletRequest;
051    import javax.servlet.http.HttpServletResponse;
052    
053    /**
054     * @author Shuyang Zhou
055     */
056    public class HttpClientSPIAgent implements SPIAgent {
057    
058            public HttpClientSPIAgent(
059                            SPIConfiguration spiConfiguration,
060                            RegistrationReference registrationReference)
061                    throws UnknownHostException {
062    
063                    this.registrationReference = registrationReference;
064    
065                    socketAddress = new InetSocketAddress(
066                            InetAddressUtil.getLoopbackInetAddress(),
067                            spiConfiguration.getConnectorPort());
068                    socketBlockingQueue = new ArrayBlockingQueue<Socket>(
069                            PropsValues.PORTAL_RESILIENCY_SPI_AGENT_CLIENT_POOL_MAX_SIZE);
070    
071                    String httpServletRequestContentString =
072                            "POST " + SPI_AGENT_CONTEXT_PATH + MAPPING_PATTERN +
073                                    " HTTP/1.1\r\nHost: localhost:" +
074                                            spiConfiguration.getConnectorPort() + "\r\n" +
075                                                    "Content-Length: 8\r\n\r\n";
076    
077                    httpServletRequestContent = httpServletRequestContentString.getBytes(
078                            Charset.forName("US-ASCII"));
079            }
080    
081            @Override
082            public void destroy() {
083                    Iterator<Socket> iterator = socketBlockingQueue.iterator();
084    
085                    while (iterator.hasNext()) {
086                            Socket socket = iterator.next();
087    
088                            iterator.remove();
089    
090                            try {
091                                    socket.close();
092                            }
093                            catch (IOException ioe) {
094                                    if (_log.isWarnEnabled()) {
095                                            _log.warn(ioe, ioe);
096                                    }
097                            }
098                    }
099            }
100    
101            @Override
102            public void init(SPI spi) throws PortalResiliencyException {
103                    try {
104                            SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
105    
106                            spi.addServlet(
107                                    SPI_AGENT_CONTEXT_PATH, spiConfiguration.getBaseDir(),
108                                    MAPPING_PATTERN, AcceptorServlet.class.getName());
109                    }
110                    catch (Exception e) {
111                            throw new PortalResiliencyException(e);
112                    }
113            }
114    
115            @Override
116            public HttpServletRequest prepareRequest(HttpServletRequest request)
117                    throws IOException {
118    
119                    SPIAgentRequest spiAgentRequest = SPIAgentRequest.readFrom(
120                            request.getInputStream());
121    
122                    HttpServletRequest spiAgentHttpServletRequest =
123                            spiAgentRequest.populateRequest(request);
124    
125                    spiAgentHttpServletRequest.setAttribute(
126                            WebKeys.SPI_AGENT_REQUEST, spiAgentRequest);
127    
128                    return spiAgentHttpServletRequest;
129            }
130    
131            @Override
132            public HttpServletResponse prepareResponse(
133                    HttpServletRequest request, HttpServletResponse response) {
134    
135                    HttpServletResponse spiAgentHttpServletResponse =
136                            new BufferCacheServletResponse(
137                                    new ReadOnlyServletResponse(response));
138    
139                    request.setAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE, response);
140                    request.setAttribute(
141                            WebKeys.SPI_AGENT_RESPONSE, new SPIAgentResponse());
142    
143                    return spiAgentHttpServletResponse;
144            }
145    
146            @Override
147            public void service(
148                            HttpServletRequest request, HttpServletResponse response)
149                    throws PortalResiliencyException {
150    
151                    Socket socket = null;
152    
153                    try {
154                            socket = borrowSocket();
155    
156                            SPIAgentRequest spiAgentRequest = new SPIAgentRequest(request);
157    
158                            OutputStream outputStream = socket.getOutputStream();
159    
160                            outputStream.write(httpServletRequestContent);
161    
162                            spiAgentRequest.writeTo(registrationReference, outputStream);
163    
164                            InputStream inputStream = socket.getInputStream();
165    
166                            DataInputStream dataInputStream = new DataInputStream(inputStream);
167    
168                            boolean forceCloseSocket = consumeHttpResponseHead(dataInputStream);
169    
170                            SPIAgentResponse spiAgentResponse = SPIAgentResponse.readFrom(
171                                    dataInputStream);
172    
173                            spiAgentResponse.populate(request, response);
174    
175                            returnSocket(socket, forceCloseSocket);
176    
177                            socket = null;
178                    }
179                    catch (IOException ioe) {
180                            throw new PortalResiliencyException(ioe);
181                    }
182                    finally {
183                            if (socket != null) {
184                                    try {
185                                            socket.close();
186                                    }
187                                    catch (IOException ioe) {
188                                            if (_log.isWarnEnabled()) {
189                                                    _log.warn(ioe, ioe);
190                                            }
191                                    }
192                            }
193                    }
194            }
195    
196            @Override
197            public void transferResponse(
198                            HttpServletRequest request, HttpServletResponse response,
199                            Exception exception)
200                    throws IOException {
201    
202                    request.removeAttribute(WebKeys.SPI_AGENT_REQUEST);
203    
204                    SPIAgentResponse spiAgentResponse =
205                            (SPIAgentResponse)request.getAttribute(WebKeys.SPI_AGENT_RESPONSE);
206    
207                    request.removeAttribute(WebKeys.SPI_AGENT_RESPONSE);
208    
209                    if (exception != null) {
210                            spiAgentResponse.setException(exception);
211                    }
212                    else {
213                            BufferCacheServletResponse bufferCacheServletResponse =
214                                    (BufferCacheServletResponse)response;
215    
216                            spiAgentResponse.captureResponse(
217                                    request, bufferCacheServletResponse);
218                    }
219    
220                    HttpServletResponse originalResponse =
221                            (HttpServletResponse)request.getAttribute(
222                                    WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
223    
224                    request.removeAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
225    
226                    originalResponse.setContentLength(8);
227    
228                    spiAgentResponse.writeTo(
229                            registrationReference, originalResponse.getOutputStream());
230            }
231    
232            protected Socket borrowSocket() throws IOException {
233                    Socket socket = socketBlockingQueue.poll();
234    
235                    if (socket != null) {
236                            if (socket.isClosed() || !socket.isConnected() ||
237                                    socket.isInputShutdown() || socket.isOutputShutdown()) {
238    
239                                    try {
240                                            socket.close();
241                                    }
242                                    catch (IOException ioe) {
243                                            if (_log.isWarnEnabled()) {
244                                                    _log.warn(ioe, ioe);
245                                            }
246                                    }
247    
248                                    socket = null;
249                            }
250                    }
251    
252                    if (socket == null) {
253                            socket = new Socket();
254    
255                            socket.connect(socketAddress);
256                    }
257    
258                    return socket;
259            }
260    
261            protected boolean consumeHttpResponseHead(DataInput dataInput)
262                    throws IOException {
263    
264                    String statusLine = dataInput.readLine();
265    
266                    if (!statusLine.equals("HTTP/1.1 200 OK")) {
267                            throw new IOException("Error status line: " + statusLine);
268                    }
269    
270                    boolean forceCloseSocket = false;
271    
272                    String line = null;
273    
274                    while (((line = dataInput.readLine()) != null) && (line.length() > 0)) {
275                            String[] headerKeyValuePair = StringUtil.split(
276                                    line, CharPool.COLON);
277    
278                            String headerName = headerKeyValuePair[0].trim();
279    
280                            headerName = headerName.toLowerCase();
281    
282                            if (headerName.equals("connection")) {
283                                    String headerValue = headerKeyValuePair[1].trim();
284    
285                                    headerValue = headerValue.toLowerCase();
286    
287                                    if (headerValue.equals("close")) {
288                                            forceCloseSocket = true;
289                                    }
290                            }
291                    }
292    
293                    return forceCloseSocket;
294            }
295    
296            protected void returnSocket(Socket socket, boolean forceCloseSocket) {
297                    boolean pooled = false;
298    
299                    if (!forceCloseSocket && socket.isConnected() &&
300                            !socket.isInputShutdown() && !socket.isOutputShutdown()) {
301    
302                            pooled = socketBlockingQueue.offer(socket);
303                    }
304    
305                    if (!pooled) {
306                            try {
307                                    socket.close();
308                            }
309                            catch (IOException ioe) {
310                                    if (_log.isWarnEnabled()) {
311                                            _log.warn(ioe, ioe);
312                                    }
313                            }
314                    }
315            }
316    
317            protected static final String MAPPING_PATTERN = "/acceptor";
318    
319            protected static final String SPI_AGENT_CONTEXT_PATH = "/spi_agent";
320    
321            protected final byte[] httpServletRequestContent;
322            protected final RegistrationReference registrationReference;
323            protected final SocketAddress socketAddress;
324            protected final BlockingQueue<Socket> socketBlockingQueue;
325    
326            private static Log _log = LogFactoryUtil.getLog(HttpClientSPIAgent.class);
327    
328    }