001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.poller;
016    
017    import com.liferay.portal.kernel.exception.SystemException;
018    import com.liferay.portal.kernel.json.JSONFactoryUtil;
019    import com.liferay.portal.kernel.json.JSONObject;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.messaging.DestinationNames;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.messaging.MessageBusUtil;
025    import com.liferay.portal.kernel.messaging.MessageListener;
026    import com.liferay.portal.kernel.poller.DefaultPollerResponse;
027    import com.liferay.portal.kernel.poller.PollerHeader;
028    import com.liferay.portal.kernel.poller.PollerProcessor;
029    import com.liferay.portal.kernel.poller.PollerRequest;
030    import com.liferay.portal.kernel.poller.PollerResponse;
031    import com.liferay.portal.kernel.util.GetterUtil;
032    import com.liferay.portal.kernel.util.StringPool;
033    import com.liferay.portal.kernel.util.StringUtil;
034    import com.liferay.portal.kernel.util.Validator;
035    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
036    import com.liferay.portal.model.BrowserTracker;
037    import com.liferay.portal.model.Company;
038    import com.liferay.portal.service.BrowserTrackerLocalServiceUtil;
039    import com.liferay.portal.service.CompanyLocalServiceUtil;
040    import com.liferay.util.Encryptor;
041    
042    import java.util.ArrayList;
043    import java.util.HashMap;
044    import java.util.HashSet;
045    import java.util.List;
046    import java.util.Map;
047    import java.util.Set;
048    
049    /**
050     * @author Michael C. Han
051     * @author Brian Wing Shun Chan
052     * @author Edward Han
053     */
054    public class PollerRequestHandlerImpl
055            implements PollerRequestHandler, MessageListener {
056    
057            public PollerHeader getPollerHeader(String pollerRequestString) {
058                    if (Validator.isNull(pollerRequestString)) {
059                            return null;
060                    }
061    
062                    Map<String, Object>[] pollerRequestChunks =
063                            parsePollerRequestParameters(pollerRequestString);
064    
065                    return parsePollerRequestHeader(pollerRequestChunks);
066            }
067    
068            public JSONObject processRequest(String path, String pollerRequestString)
069                    throws Exception {
070    
071                    if (Validator.isNull(pollerRequestString)) {
072                            return null;
073                    }
074    
075                    Map<String, Object>[] pollerRequestChunks =
076                            parsePollerRequestParameters(pollerRequestString);
077    
078                    PollerHeader pollerHeader = parsePollerRequestHeader(
079                            pollerRequestChunks);
080    
081                    if (pollerHeader == null) {
082                            return null;
083                    }
084    
085                    boolean receiveRequest = isReceiveRequest(path);
086    
087                    String pollerSessionId = getPollerSessionId(pollerHeader);
088    
089                    PollerSession pollerSession = null;
090    
091                    synchronized (_pollerSessions) {
092                            pollerSession = _pollerSessions.get(pollerSessionId);
093    
094                            if ((pollerSession == null) && receiveRequest) {
095                                    pollerSession = new PollerSession(pollerSessionId);
096    
097                                    _pollerSessions.put(pollerSessionId, pollerSession);
098                            }
099                    }
100    
101                    List<PollerRequest> pollerRequests = createPollerRequests(
102                            pollerHeader, pollerRequestChunks, receiveRequest);
103    
104                    executePollerRequests(pollerSession, pollerRequests);
105    
106                    if (receiveRequest) {
107                            return createPollerResponseHeader(pollerHeader);
108                    }
109                    else {
110                            return null;
111                    }
112            }
113    
114            public void receive(Message message) {
115                    Object messagePayload = message.getPayload();
116    
117                    if (!(messagePayload instanceof PollerResponse)) {
118                            return;
119                    }
120    
121                    PollerResponse pollerResponse = (PollerResponse) messagePayload;
122    
123                    PollerHeader pollerHeader = pollerResponse.getPollerHeader();
124    
125                    String pollerSessionId = getPollerSessionId(pollerHeader);
126    
127                    synchronized (_pollerSessions) {
128                            PollerSession pollerSession = _pollerSessions.get(
129                                    pollerSessionId);
130    
131                            if ((pollerSession != null) &&
132                                    pollerSession.completePortletProcessing(
133                                            pollerResponse.getPortletId(), message.getResponseId())) {
134    
135                                    _pollerSessions.remove(pollerSessionId);
136                            }
137                    }
138            }
139    
140            protected PollerRequest createPollerRequest(
141                            boolean receiveRequest, PollerHeader pollerHeader, String portletId)
142                    throws Exception {
143    
144                    return createPollerRequest(
145                            receiveRequest, pollerHeader, portletId,
146                            new HashMap<String, String>(), null);
147            }
148    
149            protected PollerRequest createPollerRequest(
150                            boolean receiveRequest, PollerHeader pollerHeader, String portletId,
151                            Map<String, String> parameterMap, String chunkId)
152                    throws Exception {
153    
154                    PollerProcessor pollerProcessor =
155                            PollerProcessorUtil.getPollerProcessor(portletId);
156    
157                    if (pollerProcessor == null) {
158                            if (_log.isWarnEnabled()) {
159                                    _log.warn(
160                                            "Poller processor not found for portlet " + portletId);
161                            }
162    
163                            return null;
164                    }
165    
166                    return new PollerRequest(
167                            pollerHeader, portletId, parameterMap, chunkId, receiveRequest);
168            }
169    
170            protected List<PollerRequest> createPollerRequests(
171                            PollerHeader pollerHeader,
172                            Map<String, Object>[] pollerRequestChunks, boolean receiveRequest)
173                    throws Exception {
174    
175                    String[] portletIds = pollerHeader.getPortletIds();
176    
177                    List<PollerRequest> pollerRequests = new ArrayList<PollerRequest>(
178                            portletIds.length);
179    
180                    Set<String> receiveRequestPortletIds = null;
181    
182                    if (receiveRequest) {
183                            receiveRequestPortletIds = new HashSet<String>(
184                                    (int)(pollerRequestChunks.length / 0.75) + 1);
185                    }
186    
187                    for (int i = 1; i < pollerRequestChunks.length; i++) {
188                            Map<String, Object> pollerRequestChunk = pollerRequestChunks[i];
189    
190                            String portletId = (String)pollerRequestChunk.get("portletId");
191                            Map<String, String> parameterMap = parseData(pollerRequestChunk);
192                            String chunkId = (String)pollerRequestChunk.get("chunkId");
193    
194                            try {
195                                    PollerRequest pollerRequest = createPollerRequest(
196                                            receiveRequest, pollerHeader, portletId, parameterMap,
197                                            chunkId);
198    
199                                    pollerRequests.add(pollerRequest);
200    
201                                    if (receiveRequest) {
202                                            receiveRequestPortletIds.add(portletId);
203                                    }
204                            }
205                            catch (Exception e) {
206                                    _log.error(e, e);
207                            }
208                    }
209    
210                    if (receiveRequest) {
211                            for (String portletId : portletIds) {
212                                    if (receiveRequestPortletIds.contains(portletId)) {
213                                            continue;
214                                    }
215    
216                                    try {
217                                            PollerRequest pollerRequest = createPollerRequest(
218                                                    receiveRequest, pollerHeader, portletId);
219    
220                                            pollerRequests.add(pollerRequest);
221                                    }
222                                    catch (Exception e) {
223                                            _log.error(e, e);
224                                    }
225                            }
226                    }
227    
228                    return pollerRequests;
229            }
230    
231            protected JSONObject createPollerResponseHeader(PollerHeader pollerHeader)
232                    throws SystemException {
233    
234                    if (pollerHeader == null) {
235                            return null;
236                    }
237    
238                    boolean suspendPolling = false;
239    
240                    if (pollerHeader.isStartPolling()) {
241                            BrowserTrackerLocalServiceUtil.updateBrowserTracker(
242                                    pollerHeader.getUserId(), pollerHeader.getBrowserKey());
243                    }
244                    else {
245                            BrowserTracker browserTracker =
246                                    BrowserTrackerLocalServiceUtil.getBrowserTracker(
247                                            pollerHeader.getUserId(), pollerHeader.getBrowserKey());
248    
249                            if (browserTracker.getBrowserKey() !=
250                                    pollerHeader.getBrowserKey()) {
251    
252                                    suspendPolling = true;
253                            }
254                    }
255    
256                    JSONObject pollerResponseHeaderJSONObject =
257                            JSONFactoryUtil.createJSONObject();
258    
259                    pollerResponseHeaderJSONObject.put(
260                            "userId", pollerHeader.getUserId());
261                    pollerResponseHeaderJSONObject.put(
262                            "initialRequest", pollerHeader.isInitialRequest());
263                    pollerResponseHeaderJSONObject.put("suspendPolling", suspendPolling);
264    
265                    return pollerResponseHeaderJSONObject;
266            }
267    
268            protected void executePollerRequests(
269                    PollerSession pollerSession, List<PollerRequest> pollerRequests) {
270    
271                    for (PollerRequest pollerRequest : pollerRequests) {
272                            PollerRequestResponsePair pollerRequestResponsePair =
273                                    new PollerRequestResponsePair(pollerRequest);
274    
275                            String responseId = null;
276    
277                            if (pollerRequest.isReceiveRequest()) {
278                                    responseId = PortalUUIDUtil.generate();
279    
280                                    PollerResponse pollerResponse = new DefaultPollerResponse(
281                                            pollerRequest.getPollerHeader(),
282                                            pollerRequest.getPortletId(),
283                                            pollerRequest.getChunkId());
284    
285                                    pollerRequestResponsePair.setPollerResponse(pollerResponse);
286    
287                                    if (!pollerSession.beginPortletProcessing(
288                                                    pollerRequestResponsePair, responseId)) {
289    
290                                            continue;
291                                    }
292                            }
293    
294                            Message message = new Message();
295    
296                            message.setPayload(pollerRequestResponsePair);
297    
298                            if (pollerRequest.isReceiveRequest()) {
299                                    message.setResponseId(responseId);
300    
301                                    message.setResponseDestinationName(
302                                            DestinationNames.POLLER_RESPONSE);
303                            }
304    
305                            MessageBusUtil.sendMessage(DestinationNames.POLLER, message);
306                    }
307            }
308    
309            protected String fixPollerRequestString(String pollerRequestString) {
310                    if (Validator.isNull(pollerRequestString)) {
311                            return null;
312                    }
313    
314                    return StringUtil.replace(
315                            pollerRequestString,
316                            new String[] {
317                                    StringPool.OPEN_CURLY_BRACE,
318                                    StringPool.CLOSE_CURLY_BRACE,
319                                    _ESCAPED_OPEN_CURLY_BRACE,
320                                    _ESCAPED_CLOSE_CURLY_BRACE
321                            },
322                            new String[] {
323                                    _OPEN_HASH_MAP_WRAPPER,
324                                    StringPool.DOUBLE_CLOSE_CURLY_BRACE,
325                                    StringPool.OPEN_CURLY_BRACE,
326                                    StringPool.CLOSE_CURLY_BRACE
327                            });
328            }
329    
330            protected String getPollerSessionId(PollerHeader pollerHeader) {
331                    return String.valueOf(pollerHeader.getUserId());
332            }
333    
334            protected long getUserId(long companyId, String userIdString) {
335                    long userId = 0;
336    
337                    try {
338                            Company company = CompanyLocalServiceUtil.getCompany(companyId);
339    
340                            userId = GetterUtil.getLong(
341                                    Encryptor.decrypt(company.getKeyObj(), userIdString));
342                    }
343                    catch (Exception e) {
344                            _log.error(
345                                    "Invalid credentials for company id " + companyId +
346                                            " and user id " + userIdString);
347                    }
348    
349                    return userId;
350            }
351    
352            protected boolean isReceiveRequest(String path) {
353                    if ((path != null) && path.endsWith(_PATH_RECEIVE)) {
354                            return true;
355                    }
356                    else {
357                            return false;
358                    }
359            }
360    
361            protected Map<String, String> parseData(
362                    Map<String, Object> pollerRequestChunk)
363                    throws Exception {
364    
365                    Map<String, Object> oldParameterMap =
366                            (Map<String, Object>)pollerRequestChunk.get("data");
367    
368                    Map<String, String> newParameterMap = new HashMap<String, String>();
369    
370                    if (oldParameterMap == null) {
371                            return newParameterMap;
372                    }
373    
374                    for (Map.Entry<String, Object> entry : oldParameterMap.entrySet()) {
375                            newParameterMap.put(
376                                    entry.getKey(), String.valueOf(entry.getValue()));
377                    }
378    
379                    return newParameterMap;
380            }
381    
382            protected PollerHeader parsePollerRequestHeader(
383                    Map<String, Object>[] pollerRequestChunks) {
384    
385                    if ((pollerRequestChunks == null) || (pollerRequestChunks.length < 1)) {
386                            return null;
387                    }
388    
389                    Map<String, Object> pollerRequestChunk = pollerRequestChunks[0];
390    
391                    long companyId = GetterUtil.getLong(
392                            String.valueOf(pollerRequestChunk.get("companyId")));
393                    String userIdString = GetterUtil.getString(
394                            String.valueOf(pollerRequestChunk.get("userId")));
395                    long browserKey = GetterUtil.getLong(
396                            String.valueOf(pollerRequestChunk.get("browserKey")));
397                    String[] portletIds = StringUtil.split(
398                            String.valueOf(pollerRequestChunk.get("portletIds")));
399                    boolean initialRequest = GetterUtil.getBoolean(
400                            String.valueOf(pollerRequestChunk.get("initialRequest")));
401                    boolean startPolling = GetterUtil.getBoolean(
402                            String.valueOf(pollerRequestChunk.get("startPolling")));
403    
404                    long userId = getUserId(companyId, userIdString);
405    
406                    if (userId == 0) {
407                            return null;
408                    }
409    
410                    return new PollerHeader(
411                            companyId, userId, browserKey, portletIds, initialRequest,
412                            startPolling);
413            }
414    
415            protected Map<String, Object>[] parsePollerRequestParameters(
416                    String pollerRequestString) {
417    
418                    String fixedPollerRequestString = fixPollerRequestString(
419                            pollerRequestString);
420    
421                    return (Map<String, Object>[])JSONFactoryUtil.deserialize(
422                            fixedPollerRequestString);
423            }
424    
425            private static final String _ESCAPED_CLOSE_CURLY_BRACE =
426                    "[$CLOSE_CURLY_BRACE$]";
427    
428            private static final String _ESCAPED_OPEN_CURLY_BRACE =
429                    "[$OPEN_CURLY_BRACE$]";
430    
431            private static final String _OPEN_HASH_MAP_WRAPPER =
432                    "{\"javaClass\":\"java.util.HashMap\",\"map\":{";
433    
434            private static final String _PATH_RECEIVE = "/receive";
435    
436            private static Log _log = LogFactoryUtil.getLog(
437                    PollerRequestHandlerImpl.class);
438    
439            private Map<String, PollerSession> _pollerSessions =
440                    new HashMap<String, PollerSession>();
441    
442    }