001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.resiliency.spi.agent;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
021    import com.liferay.portal.kernel.resiliency.spi.SPI;
022    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
023    import com.liferay.portal.kernel.resiliency.spi.agent.AcceptorServlet;
024    import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
025    import com.liferay.portal.kernel.servlet.BufferCacheServletResponse;
026    import com.liferay.portal.kernel.servlet.ReadOnlyServletResponse;
027    import com.liferay.portal.kernel.util.CharPool;
028    import com.liferay.portal.kernel.util.InetAddressUtil;
029    import com.liferay.portal.kernel.util.StringUtil;
030    import com.liferay.portal.kernel.util.WebKeys;
031    import com.liferay.portal.model.Portlet;
032    import com.liferay.portal.util.PropsValues;
033    
034    import java.io.DataInput;
035    import java.io.DataInputStream;
036    import java.io.File;
037    import java.io.IOException;
038    import java.io.InputStream;
039    import java.io.OutputStream;
040    
041    import java.net.InetSocketAddress;
042    import java.net.Socket;
043    import java.net.SocketAddress;
044    import java.net.UnknownHostException;
045    
046    import java.nio.charset.Charset;
047    
048    import java.util.Iterator;
049    import java.util.concurrent.ArrayBlockingQueue;
050    import java.util.concurrent.BlockingQueue;
051    
052    import javax.servlet.http.HttpServletRequest;
053    import javax.servlet.http.HttpServletResponse;
054    
055    /**
056     * @author Shuyang Zhou
057     */
058    public class HttpClientSPIAgent implements SPIAgent {
059    
060            public HttpClientSPIAgent(
061                            SPIConfiguration spiConfiguration,
062                            RegistrationReference registrationReference)
063                    throws UnknownHostException {
064    
065                    this.registrationReference = registrationReference;
066    
067                    socketAddress = new InetSocketAddress(
068                            InetAddressUtil.getLoopbackInetAddress(),
069                            spiConfiguration.getConnectorPort());
070                    socketBlockingQueue = new ArrayBlockingQueue<>(
071                            PropsValues.PORTAL_RESILIENCY_SPI_AGENT_CLIENT_POOL_MAX_SIZE);
072    
073                    String httpServletRequestContentString =
074                            "POST " + SPI_AGENT_CONTEXT_PATH + MAPPING_PATTERN +
075                                    " HTTP/1.1\r\nHost: localhost:" +
076                                            spiConfiguration.getConnectorPort() + "\r\n" +
077                                                    "Content-Length: 8\r\n\r\n";
078    
079                    httpServletRequestContent = httpServletRequestContentString.getBytes(
080                            Charset.forName("US-ASCII"));
081            }
082    
083            @Override
084            public void destroy() {
085                    Iterator<Socket> iterator = socketBlockingQueue.iterator();
086    
087                    while (iterator.hasNext()) {
088                            Socket socket = iterator.next();
089    
090                            iterator.remove();
091    
092                            try {
093                                    socket.close();
094                            }
095                            catch (IOException ioe) {
096                                    if (_log.isWarnEnabled()) {
097                                            _log.warn(ioe, ioe);
098                                    }
099                            }
100                    }
101            }
102    
103            @Override
104            public void init(SPI spi) throws PortalResiliencyException {
105                    try {
106                            SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
107    
108                            spi.addServlet(
109                                    SPI_AGENT_CONTEXT_PATH, spiConfiguration.getBaseDir(),
110                                    MAPPING_PATTERN, AcceptorServlet.class.getName());
111                    }
112                    catch (Exception e) {
113                            throw new PortalResiliencyException(e);
114                    }
115            }
116    
117            @Override
118            public HttpServletRequest prepareRequest(HttpServletRequest request)
119                    throws IOException {
120    
121                    SPIAgentRequest spiAgentRequest = SPIAgentRequest.readFrom(
122                            request.getInputStream());
123    
124                    HttpServletRequest spiAgentHttpServletRequest =
125                            spiAgentRequest.populateRequest(request);
126    
127                    spiAgentHttpServletRequest.setAttribute(
128                            WebKeys.SPI_AGENT_REQUEST, spiAgentRequest);
129    
130                    return spiAgentHttpServletRequest;
131            }
132    
133            @Override
134            public HttpServletResponse prepareResponse(
135                    HttpServletRequest request, HttpServletResponse response) {
136    
137                    HttpServletResponse spiAgentHttpServletResponse =
138                            new BufferCacheServletResponse(
139                                    new ReadOnlyServletResponse(response));
140    
141                    request.setAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE, response);
142    
143                    Portlet portlet = (Portlet)request.getAttribute(
144                            WebKeys.SPI_AGENT_PORTLET);
145    
146                    request.setAttribute(
147                            WebKeys.SPI_AGENT_RESPONSE,
148                            new SPIAgentResponse(portlet.getContextName()));
149    
150                    return spiAgentHttpServletResponse;
151            }
152    
153            @Override
154            public void service(
155                            HttpServletRequest request, HttpServletResponse response)
156                    throws PortalResiliencyException {
157    
158                    Socket socket = null;
159    
160                    try {
161                            socket = borrowSocket();
162    
163                            SPIAgentRequest spiAgentRequest = new SPIAgentRequest(request);
164    
165                            OutputStream outputStream = socket.getOutputStream();
166    
167                            outputStream.write(httpServletRequestContent);
168    
169                            spiAgentRequest.writeTo(registrationReference, outputStream);
170    
171                            InputStream inputStream = socket.getInputStream();
172    
173                            DataInputStream dataInputStream = new DataInputStream(inputStream);
174    
175                            boolean forceCloseSocket = consumeHttpResponseHead(dataInputStream);
176    
177                            SPIAgentResponse spiAgentResponse = SPIAgentResponse.readFrom(
178                                    dataInputStream);
179    
180                            spiAgentResponse.populate(request, response);
181    
182                            returnSocket(socket, forceCloseSocket);
183    
184                            socket = null;
185                    }
186                    catch (IOException ioe) {
187                            throw new PortalResiliencyException(ioe);
188                    }
189                    finally {
190                            if (socket != null) {
191                                    try {
192                                            socket.close();
193                                    }
194                                    catch (IOException ioe) {
195                                            if (_log.isWarnEnabled()) {
196                                                    _log.warn(ioe, ioe);
197                                            }
198                                    }
199                            }
200                    }
201            }
202    
203            @Override
204            public void transferResponse(
205                            HttpServletRequest request, HttpServletResponse response,
206                            Exception exception)
207                    throws IOException {
208    
209                    SPIAgentRequest spiAgentRequest = (SPIAgentRequest)request.getAttribute(
210                            WebKeys.SPI_AGENT_REQUEST);
211    
212                    request.removeAttribute(WebKeys.SPI_AGENT_REQUEST);
213    
214                    File requestBodyFile = spiAgentRequest.requestBodyFile;
215    
216                    if (requestBodyFile != null) {
217                            if (!requestBodyFile.delete()) {
218                                    requestBodyFile.deleteOnExit();
219                            }
220                    }
221    
222                    SPIAgentResponse spiAgentResponse =
223                            (SPIAgentResponse)request.getAttribute(WebKeys.SPI_AGENT_RESPONSE);
224    
225                    request.removeAttribute(WebKeys.SPI_AGENT_RESPONSE);
226    
227                    if (exception != null) {
228                            spiAgentResponse.setException(exception);
229                    }
230                    else {
231                            BufferCacheServletResponse bufferCacheServletResponse =
232                                    (BufferCacheServletResponse)response;
233    
234                            spiAgentResponse.captureResponse(
235                                    request, bufferCacheServletResponse);
236                    }
237    
238                    HttpServletResponse originalResponse =
239                            (HttpServletResponse)request.getAttribute(
240                                    WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
241    
242                    request.removeAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
243    
244                    originalResponse.setContentLength(8);
245    
246                    spiAgentResponse.writeTo(
247                            registrationReference, originalResponse.getOutputStream());
248            }
249    
250            protected Socket borrowSocket() throws IOException {
251                    Socket socket = socketBlockingQueue.poll();
252    
253                    if (socket != null) {
254                            if (socket.isClosed() || !socket.isConnected() ||
255                                    socket.isInputShutdown() || socket.isOutputShutdown()) {
256    
257                                    try {
258                                            socket.close();
259                                    }
260                                    catch (IOException ioe) {
261                                            if (_log.isWarnEnabled()) {
262                                                    _log.warn(ioe, ioe);
263                                            }
264                                    }
265    
266                                    socket = null;
267                            }
268                    }
269    
270                    if (socket == null) {
271                            socket = new Socket();
272    
273                            socket.connect(socketAddress);
274                    }
275    
276                    return socket;
277            }
278    
279            protected boolean consumeHttpResponseHead(DataInput dataInput)
280                    throws IOException {
281    
282                    String statusLine = dataInput.readLine();
283    
284                    if (!statusLine.equals("HTTP/1.1 200 OK")) {
285                            throw new IOException("Error status line: " + statusLine);
286                    }
287    
288                    boolean forceCloseSocket = false;
289    
290                    String line = null;
291    
292                    while (((line = dataInput.readLine()) != null) && (line.length() > 0)) {
293                            String[] headerKeyValuePair = StringUtil.split(
294                                    line, CharPool.COLON);
295    
296                            String headerName = headerKeyValuePair[0].trim();
297    
298                            headerName = StringUtil.toLowerCase(headerName);
299    
300                            if (headerName.equals("connection")) {
301                                    String headerValue = headerKeyValuePair[1].trim();
302    
303                                    headerValue = StringUtil.toLowerCase(headerValue);
304    
305                                    if (headerValue.equals("close")) {
306                                            forceCloseSocket = true;
307                                    }
308                            }
309                    }
310    
311                    return forceCloseSocket;
312            }
313    
314            protected void returnSocket(Socket socket, boolean forceCloseSocket) {
315                    boolean pooled = false;
316    
317                    if (!forceCloseSocket && socket.isConnected() &&
318                            !socket.isInputShutdown() && !socket.isOutputShutdown()) {
319    
320                            pooled = socketBlockingQueue.offer(socket);
321                    }
322    
323                    if (!pooled) {
324                            try {
325                                    socket.close();
326                            }
327                            catch (IOException ioe) {
328                                    if (_log.isWarnEnabled()) {
329                                            _log.warn(ioe, ioe);
330                                    }
331                            }
332                    }
333            }
334    
335            protected static final String MAPPING_PATTERN = "/acceptor";
336    
337            protected static final String SPI_AGENT_CONTEXT_PATH = "/spi_agent";
338    
339            protected final byte[] httpServletRequestContent;
340            protected final RegistrationReference registrationReference;
341            protected final SocketAddress socketAddress;
342            protected final BlockingQueue<Socket> socketBlockingQueue;
343    
344            private static final Log _log = LogFactoryUtil.getLog(
345                    HttpClientSPIAgent.class);
346    
347    }