001
014
015 package com.liferay.portal.resiliency.spi.agent;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
021 import com.liferay.portal.kernel.resiliency.spi.SPI;
022 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
023 import com.liferay.portal.kernel.resiliency.spi.agent.AcceptorServlet;
024 import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
025 import com.liferay.portal.kernel.servlet.BufferCacheServletResponse;
026 import com.liferay.portal.kernel.servlet.ReadOnlyServletResponse;
027 import com.liferay.portal.kernel.util.CharPool;
028 import com.liferay.portal.kernel.util.InetAddressUtil;
029 import com.liferay.portal.kernel.util.StringUtil;
030 import com.liferay.portal.kernel.util.WebKeys;
031 import com.liferay.portal.model.Portlet;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.io.DataInput;
035 import java.io.DataInputStream;
036 import java.io.File;
037 import java.io.IOException;
038 import java.io.InputStream;
039 import java.io.OutputStream;
040
041 import java.net.InetSocketAddress;
042 import java.net.Socket;
043 import java.net.SocketAddress;
044 import java.net.UnknownHostException;
045
046 import java.nio.charset.Charset;
047
048 import java.util.Iterator;
049 import java.util.concurrent.ArrayBlockingQueue;
050 import java.util.concurrent.BlockingQueue;
051
052 import javax.servlet.http.HttpServletRequest;
053 import javax.servlet.http.HttpServletResponse;
054
055
058 public class HttpClientSPIAgent implements SPIAgent {
059
060 public HttpClientSPIAgent(
061 SPIConfiguration spiConfiguration,
062 RegistrationReference registrationReference)
063 throws UnknownHostException {
064
065 this.registrationReference = registrationReference;
066
067 socketAddress = new InetSocketAddress(
068 InetAddressUtil.getLoopbackInetAddress(),
069 spiConfiguration.getConnectorPort());
070 socketBlockingQueue = new ArrayBlockingQueue<>(
071 PropsValues.PORTAL_RESILIENCY_SPI_AGENT_CLIENT_POOL_MAX_SIZE);
072
073 String httpServletRequestContentString =
074 "POST " + SPI_AGENT_CONTEXT_PATH + MAPPING_PATTERN +
075 " HTTP/1.1\r\nHost: localhost:" +
076 spiConfiguration.getConnectorPort() + "\r\n" +
077 "Content-Length: 8\r\n\r\n";
078
079 httpServletRequestContent = httpServletRequestContentString.getBytes(
080 Charset.forName("US-ASCII"));
081 }
082
083 @Override
084 public void destroy() {
085 Iterator<Socket> iterator = socketBlockingQueue.iterator();
086
087 while (iterator.hasNext()) {
088 Socket socket = iterator.next();
089
090 iterator.remove();
091
092 try {
093 socket.close();
094 }
095 catch (IOException ioe) {
096 if (_log.isWarnEnabled()) {
097 _log.warn(ioe, ioe);
098 }
099 }
100 }
101 }
102
103 @Override
104 public void init(SPI spi) throws PortalResiliencyException {
105 try {
106 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
107
108 spi.addServlet(
109 SPI_AGENT_CONTEXT_PATH, spiConfiguration.getBaseDir(),
110 MAPPING_PATTERN, AcceptorServlet.class.getName());
111 }
112 catch (Exception e) {
113 throw new PortalResiliencyException(e);
114 }
115 }
116
117 @Override
118 public HttpServletRequest prepareRequest(HttpServletRequest request)
119 throws IOException {
120
121 SPIAgentRequest spiAgentRequest = SPIAgentRequest.readFrom(
122 request.getInputStream());
123
124 HttpServletRequest spiAgentHttpServletRequest =
125 spiAgentRequest.populateRequest(request);
126
127 spiAgentHttpServletRequest.setAttribute(
128 WebKeys.SPI_AGENT_REQUEST, spiAgentRequest);
129
130 return spiAgentHttpServletRequest;
131 }
132
133 @Override
134 public HttpServletResponse prepareResponse(
135 HttpServletRequest request, HttpServletResponse response) {
136
137 HttpServletResponse spiAgentHttpServletResponse =
138 new BufferCacheServletResponse(
139 new ReadOnlyServletResponse(response));
140
141 request.setAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE, response);
142
143 Portlet portlet = (Portlet)request.getAttribute(
144 WebKeys.SPI_AGENT_PORTLET);
145
146 request.setAttribute(
147 WebKeys.SPI_AGENT_RESPONSE,
148 new SPIAgentResponse(portlet.getContextName()));
149
150 return spiAgentHttpServletResponse;
151 }
152
153 @Override
154 public void service(
155 HttpServletRequest request, HttpServletResponse response)
156 throws PortalResiliencyException {
157
158 Socket socket = null;
159
160 try {
161 socket = borrowSocket();
162
163 SPIAgentRequest spiAgentRequest = new SPIAgentRequest(request);
164
165 OutputStream outputStream = socket.getOutputStream();
166
167 outputStream.write(httpServletRequestContent);
168
169 spiAgentRequest.writeTo(registrationReference, outputStream);
170
171 InputStream inputStream = socket.getInputStream();
172
173 DataInputStream dataInputStream = new DataInputStream(inputStream);
174
175 boolean forceCloseSocket = consumeHttpResponseHead(dataInputStream);
176
177 SPIAgentResponse spiAgentResponse = SPIAgentResponse.readFrom(
178 dataInputStream);
179
180 spiAgentResponse.populate(request, response);
181
182 returnSocket(socket, forceCloseSocket);
183
184 socket = null;
185 }
186 catch (IOException ioe) {
187 throw new PortalResiliencyException(ioe);
188 }
189 finally {
190 if (socket != null) {
191 try {
192 socket.close();
193 }
194 catch (IOException ioe) {
195 if (_log.isWarnEnabled()) {
196 _log.warn(ioe, ioe);
197 }
198 }
199 }
200 }
201 }
202
203 @Override
204 public void transferResponse(
205 HttpServletRequest request, HttpServletResponse response,
206 Exception exception)
207 throws IOException {
208
209 SPIAgentRequest spiAgentRequest = (SPIAgentRequest)request.getAttribute(
210 WebKeys.SPI_AGENT_REQUEST);
211
212 request.removeAttribute(WebKeys.SPI_AGENT_REQUEST);
213
214 File requestBodyFile = spiAgentRequest.requestBodyFile;
215
216 if (requestBodyFile != null) {
217 if (!requestBodyFile.delete()) {
218 requestBodyFile.deleteOnExit();
219 }
220 }
221
222 SPIAgentResponse spiAgentResponse =
223 (SPIAgentResponse)request.getAttribute(WebKeys.SPI_AGENT_RESPONSE);
224
225 request.removeAttribute(WebKeys.SPI_AGENT_RESPONSE);
226
227 if (exception != null) {
228 spiAgentResponse.setException(exception);
229 }
230 else {
231 BufferCacheServletResponse bufferCacheServletResponse =
232 (BufferCacheServletResponse)response;
233
234 spiAgentResponse.captureResponse(
235 request, bufferCacheServletResponse);
236 }
237
238 HttpServletResponse originalResponse =
239 (HttpServletResponse)request.getAttribute(
240 WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
241
242 request.removeAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
243
244 originalResponse.setContentLength(8);
245
246 spiAgentResponse.writeTo(
247 registrationReference, originalResponse.getOutputStream());
248 }
249
250 protected Socket borrowSocket() throws IOException {
251 Socket socket = socketBlockingQueue.poll();
252
253 if (socket != null) {
254 if (socket.isClosed() || !socket.isConnected() ||
255 socket.isInputShutdown() || socket.isOutputShutdown()) {
256
257 try {
258 socket.close();
259 }
260 catch (IOException ioe) {
261 if (_log.isWarnEnabled()) {
262 _log.warn(ioe, ioe);
263 }
264 }
265
266 socket = null;
267 }
268 }
269
270 if (socket == null) {
271 socket = new Socket();
272
273 socket.connect(socketAddress);
274 }
275
276 return socket;
277 }
278
279 protected boolean consumeHttpResponseHead(DataInput dataInput)
280 throws IOException {
281
282 String statusLine = dataInput.readLine();
283
284 if (!statusLine.equals("HTTP/1.1 200 OK")) {
285 throw new IOException("Error status line: " + statusLine);
286 }
287
288 boolean forceCloseSocket = false;
289
290 String line = null;
291
292 while (((line = dataInput.readLine()) != null) && (line.length() > 0)) {
293 String[] headerKeyValuePair = StringUtil.split(
294 line, CharPool.COLON);
295
296 String headerName = headerKeyValuePair[0].trim();
297
298 headerName = StringUtil.toLowerCase(headerName);
299
300 if (headerName.equals("connection")) {
301 String headerValue = headerKeyValuePair[1].trim();
302
303 headerValue = StringUtil.toLowerCase(headerValue);
304
305 if (headerValue.equals("close")) {
306 forceCloseSocket = true;
307 }
308 }
309 }
310
311 return forceCloseSocket;
312 }
313
314 protected void returnSocket(Socket socket, boolean forceCloseSocket) {
315 boolean pooled = false;
316
317 if (!forceCloseSocket && socket.isConnected() &&
318 !socket.isInputShutdown() && !socket.isOutputShutdown()) {
319
320 pooled = socketBlockingQueue.offer(socket);
321 }
322
323 if (!pooled) {
324 try {
325 socket.close();
326 }
327 catch (IOException ioe) {
328 if (_log.isWarnEnabled()) {
329 _log.warn(ioe, ioe);
330 }
331 }
332 }
333 }
334
335 protected static final String MAPPING_PATTERN = "/acceptor";
336
337 protected static final String SPI_AGENT_CONTEXT_PATH = "/spi_agent";
338
339 protected final byte[] httpServletRequestContent;
340 protected final RegistrationReference registrationReference;
341 protected final SocketAddress socketAddress;
342 protected final BlockingQueue<Socket> socketBlockingQueue;
343
344 private static final Log _log = LogFactoryUtil.getLog(
345 HttpClientSPIAgent.class);
346
347 }