001
014
015 package com.liferay.portal.resiliency.spi.agent;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
021 import com.liferay.portal.kernel.resiliency.spi.SPI;
022 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
023 import com.liferay.portal.kernel.resiliency.spi.agent.AcceptorServlet;
024 import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
025 import com.liferay.portal.kernel.servlet.BufferCacheServletResponse;
026 import com.liferay.portal.kernel.servlet.ReadOnlyServletResponse;
027 import com.liferay.portal.kernel.util.CharPool;
028 import com.liferay.portal.kernel.util.InetAddressUtil;
029 import com.liferay.portal.kernel.util.StringUtil;
030 import com.liferay.portal.kernel.util.WebKeys;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.io.DataInput;
034 import java.io.DataInputStream;
035 import java.io.IOException;
036 import java.io.InputStream;
037 import java.io.OutputStream;
038
039 import java.net.InetSocketAddress;
040 import java.net.Socket;
041 import java.net.SocketAddress;
042 import java.net.UnknownHostException;
043
044 import java.nio.charset.Charset;
045
046 import java.util.Iterator;
047 import java.util.concurrent.ArrayBlockingQueue;
048 import java.util.concurrent.BlockingQueue;
049
050 import javax.servlet.http.HttpServletRequest;
051 import javax.servlet.http.HttpServletResponse;
052
053
056 public class HttpClientSPIAgent implements SPIAgent {
057
058 public HttpClientSPIAgent(
059 SPIConfiguration spiConfiguration,
060 RegistrationReference registrationReference)
061 throws UnknownHostException {
062
063 this.registrationReference = registrationReference;
064
065 socketAddress = new InetSocketAddress(
066 InetAddressUtil.getLoopbackInetAddress(),
067 spiConfiguration.getConnectorPort());
068 socketBlockingQueue = new ArrayBlockingQueue<Socket>(
069 PropsValues.PORTAL_RESILIENCY_SPI_AGENT_CLIENT_POOL_MAX_SIZE);
070
071 String httpServletRequestContentString =
072 "POST " + SPI_AGENT_CONTEXT_PATH + MAPPING_PATTERN +
073 " HTTP/1.1\r\nHost: localhost:" +
074 spiConfiguration.getConnectorPort() + "\r\n" +
075 "Content-Length: 8\r\n\r\n";
076
077 httpServletRequestContent = httpServletRequestContentString.getBytes(
078 Charset.forName("US-ASCII"));
079 }
080
081 @Override
082 public void destroy() {
083 Iterator<Socket> iterator = socketBlockingQueue.iterator();
084
085 while (iterator.hasNext()) {
086 Socket socket = iterator.next();
087
088 iterator.remove();
089
090 try {
091 socket.close();
092 }
093 catch (IOException ioe) {
094 if (_log.isWarnEnabled()) {
095 _log.warn(ioe, ioe);
096 }
097 }
098 }
099 }
100
101 @Override
102 public void init(SPI spi) throws PortalResiliencyException {
103 try {
104 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
105
106 spi.addServlet(
107 SPI_AGENT_CONTEXT_PATH, spiConfiguration.getBaseDir(),
108 MAPPING_PATTERN, AcceptorServlet.class.getName());
109 }
110 catch (Exception e) {
111 throw new PortalResiliencyException(e);
112 }
113 }
114
115 @Override
116 public HttpServletRequest prepareRequest(HttpServletRequest request)
117 throws IOException {
118
119 SPIAgentRequest spiAgentRequest = SPIAgentRequest.readFrom(
120 request.getInputStream());
121
122 HttpServletRequest spiAgentHttpServletRequest =
123 spiAgentRequest.populateRequest(request);
124
125 spiAgentHttpServletRequest.setAttribute(
126 WebKeys.SPI_AGENT_REQUEST, spiAgentRequest);
127
128 return spiAgentHttpServletRequest;
129 }
130
131 @Override
132 public HttpServletResponse prepareResponse(
133 HttpServletRequest request, HttpServletResponse response) {
134
135 HttpServletResponse spiAgentHttpServletResponse =
136 new BufferCacheServletResponse(
137 new ReadOnlyServletResponse(response));
138
139 request.setAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE, response);
140 request.setAttribute(
141 WebKeys.SPI_AGENT_RESPONSE, new SPIAgentResponse());
142
143 return spiAgentHttpServletResponse;
144 }
145
146 @Override
147 public void service(
148 HttpServletRequest request, HttpServletResponse response)
149 throws PortalResiliencyException {
150
151 Socket socket = null;
152
153 try {
154 socket = borrowSocket();
155
156 SPIAgentRequest spiAgentRequest = new SPIAgentRequest(request);
157
158 OutputStream outputStream = socket.getOutputStream();
159
160 outputStream.write(httpServletRequestContent);
161
162 spiAgentRequest.writeTo(registrationReference, outputStream);
163
164 InputStream inputStream = socket.getInputStream();
165
166 DataInputStream dataInputStream = new DataInputStream(inputStream);
167
168 boolean forceCloseSocket = consumeHttpResponseHead(dataInputStream);
169
170 SPIAgentResponse spiAgentResponse = SPIAgentResponse.readFrom(
171 dataInputStream);
172
173 spiAgentResponse.populate(request, response);
174
175 returnSocket(socket, forceCloseSocket);
176
177 socket = null;
178 }
179 catch (IOException ioe) {
180 throw new PortalResiliencyException(ioe);
181 }
182 finally {
183 if (socket != null) {
184 try {
185 socket.close();
186 }
187 catch (IOException ioe) {
188 if (_log.isWarnEnabled()) {
189 _log.warn(ioe, ioe);
190 }
191 }
192 }
193 }
194 }
195
196 @Override
197 public void transferResponse(
198 HttpServletRequest request, HttpServletResponse response,
199 Exception exception)
200 throws IOException {
201
202 request.removeAttribute(WebKeys.SPI_AGENT_REQUEST);
203
204 SPIAgentResponse spiAgentResponse =
205 (SPIAgentResponse)request.getAttribute(WebKeys.SPI_AGENT_RESPONSE);
206
207 request.removeAttribute(WebKeys.SPI_AGENT_RESPONSE);
208
209 if (exception != null) {
210 spiAgentResponse.setException(exception);
211 }
212 else {
213 BufferCacheServletResponse bufferCacheServletResponse =
214 (BufferCacheServletResponse)response;
215
216 spiAgentResponse.captureResponse(
217 request, bufferCacheServletResponse);
218 }
219
220 HttpServletResponse originalResponse =
221 (HttpServletResponse)request.getAttribute(
222 WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
223
224 request.removeAttribute(WebKeys.SPI_AGENT_ORIGINAL_RESPONSE);
225
226 originalResponse.setContentLength(8);
227
228 spiAgentResponse.writeTo(
229 registrationReference, originalResponse.getOutputStream());
230 }
231
232 protected Socket borrowSocket() throws IOException {
233 Socket socket = socketBlockingQueue.poll();
234
235 if (socket != null) {
236 if (socket.isClosed() || !socket.isConnected() ||
237 socket.isInputShutdown() || socket.isOutputShutdown()) {
238
239 try {
240 socket.close();
241 }
242 catch (IOException ioe) {
243 if (_log.isWarnEnabled()) {
244 _log.warn(ioe, ioe);
245 }
246 }
247
248 socket = null;
249 }
250 }
251
252 if (socket == null) {
253 socket = new Socket();
254
255 socket.connect(socketAddress);
256 }
257
258 return socket;
259 }
260
261 protected boolean consumeHttpResponseHead(DataInput dataInput)
262 throws IOException {
263
264 String statusLine = dataInput.readLine();
265
266 if (!statusLine.equals("HTTP/1.1 200 OK")) {
267 throw new IOException("Error status line: " + statusLine);
268 }
269
270 boolean forceCloseSocket = false;
271
272 String line = null;
273
274 while (((line = dataInput.readLine()) != null) && (line.length() > 0)) {
275 String[] headerKeyValuePair = StringUtil.split(
276 line, CharPool.COLON);
277
278 String headerName = headerKeyValuePair[0].trim();
279
280 headerName = headerName.toLowerCase();
281
282 if (headerName.equals("connection")) {
283 String headerValue = headerKeyValuePair[1].trim();
284
285 headerValue = headerValue.toLowerCase();
286
287 if (headerValue.equals("close")) {
288 forceCloseSocket = true;
289 }
290 }
291 }
292
293 return forceCloseSocket;
294 }
295
296 protected void returnSocket(Socket socket, boolean forceCloseSocket) {
297 boolean pooled = false;
298
299 if (!forceCloseSocket && socket.isConnected() &&
300 !socket.isInputShutdown() && !socket.isOutputShutdown()) {
301
302 pooled = socketBlockingQueue.offer(socket);
303 }
304
305 if (!pooled) {
306 try {
307 socket.close();
308 }
309 catch (IOException ioe) {
310 if (_log.isWarnEnabled()) {
311 _log.warn(ioe, ioe);
312 }
313 }
314 }
315 }
316
317 protected static final String MAPPING_PATTERN = "/acceptor";
318
319 protected static final String SPI_AGENT_CONTEXT_PATH = "/spi_agent";
320
321 protected final byte[] httpServletRequestContent;
322 protected final RegistrationReference registrationReference;
323 protected final SocketAddress socketAddress;
324 protected final BlockingQueue<Socket> socketBlockingQueue;
325
326 private static Log _log = LogFactoryUtil.getLog(HttpClientSPIAgent.class);
327
328 }