001
014
015 package com.liferay.portal.poller;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONFactoryUtil;
019 import com.liferay.portal.kernel.json.JSONObject;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.DestinationNames;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.messaging.MessageBusUtil;
025 import com.liferay.portal.kernel.messaging.MessageListener;
026 import com.liferay.portal.kernel.poller.DefaultPollerResponse;
027 import com.liferay.portal.kernel.poller.PollerHeader;
028 import com.liferay.portal.kernel.poller.PollerProcessor;
029 import com.liferay.portal.kernel.poller.PollerRequest;
030 import com.liferay.portal.kernel.poller.PollerResponse;
031 import com.liferay.portal.kernel.util.GetterUtil;
032 import com.liferay.portal.kernel.util.StringPool;
033 import com.liferay.portal.kernel.util.StringUtil;
034 import com.liferay.portal.kernel.util.Validator;
035 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
036 import com.liferay.portal.model.BrowserTracker;
037 import com.liferay.portal.model.Company;
038 import com.liferay.portal.service.BrowserTrackerLocalServiceUtil;
039 import com.liferay.portal.service.CompanyLocalServiceUtil;
040 import com.liferay.util.Encryptor;
041
042 import java.util.ArrayList;
043 import java.util.HashMap;
044 import java.util.HashSet;
045 import java.util.List;
046 import java.util.Map;
047 import java.util.Set;
048
049
054 public class PollerRequestHandlerImpl
055 implements PollerRequestHandler, MessageListener {
056
057 public PollerHeader getPollerHeader(String pollerRequestString) {
058 if (Validator.isNull(pollerRequestString)) {
059 return null;
060 }
061
062 Map<String, Object>[] pollerRequestChunks =
063 parsePollerRequestParameters(pollerRequestString);
064
065 return parsePollerRequestHeader(pollerRequestChunks);
066 }
067
068 public JSONObject processRequest(String path, String pollerRequestString)
069 throws Exception {
070
071 if (Validator.isNull(pollerRequestString)) {
072 return null;
073 }
074
075 Map<String, Object>[] pollerRequestChunks =
076 parsePollerRequestParameters(pollerRequestString);
077
078 PollerHeader pollerHeader = parsePollerRequestHeader(
079 pollerRequestChunks);
080
081 if (pollerHeader == null) {
082 return null;
083 }
084
085 boolean receiveRequest = isReceiveRequest(path);
086
087 String pollerSessionId = getPollerSessionId(pollerHeader);
088
089 PollerSession pollerSession = null;
090
091 synchronized (_pollerSessions) {
092 pollerSession = _pollerSessions.get(pollerSessionId);
093
094 if ((pollerSession == null) && receiveRequest) {
095 pollerSession = new PollerSession(pollerSessionId);
096
097 _pollerSessions.put(pollerSessionId, pollerSession);
098 }
099 }
100
101 List<PollerRequest> pollerRequests = createPollerRequests(
102 pollerHeader, pollerRequestChunks, receiveRequest);
103
104 executePollerRequests(pollerSession, pollerRequests);
105
106 if (receiveRequest) {
107 return createPollerResponseHeader(pollerHeader);
108 }
109 else {
110 return null;
111 }
112 }
113
114 public void receive(Message message) {
115 Object messagePayload = message.getPayload();
116
117 if (!(messagePayload instanceof PollerResponse)) {
118 return;
119 }
120
121 PollerResponse pollerResponse = (PollerResponse) messagePayload;
122
123 PollerHeader pollerHeader = pollerResponse.getPollerHeader();
124
125 String pollerSessionId = getPollerSessionId(pollerHeader);
126
127 synchronized (_pollerSessions) {
128 PollerSession pollerSession = _pollerSessions.get(pollerSessionId);
129
130 if ((pollerSession != null) &&
131 pollerSession.completePortletProcessing(
132 pollerResponse.getPortletId(), message.getResponseId())) {
133
134 _pollerSessions.remove(pollerSessionId);
135 }
136 }
137 }
138
139 protected PollerRequest createPollerRequest(
140 boolean receiveRequest, PollerHeader pollerHeader, String portletId)
141 throws Exception {
142
143 return createPollerRequest(
144 receiveRequest, pollerHeader, portletId,
145 new HashMap<String, String>(), null);
146 }
147
148 protected PollerRequest createPollerRequest(
149 boolean receiveRequest, PollerHeader pollerHeader, String portletId,
150 Map<String, String> parameterMap, String chunkId)
151 throws Exception {
152
153 PollerProcessor pollerProcessor =
154 PollerProcessorUtil.getPollerProcessor(portletId);
155
156 if (pollerProcessor == null) {
157 if (_log.isWarnEnabled()) {
158 _log.warn(
159 "Poller processor not found for portlet " + portletId);
160 }
161
162 return null;
163 }
164
165 return new PollerRequest(
166 pollerHeader, portletId, parameterMap, chunkId, receiveRequest);
167 }
168
169 protected List<PollerRequest> createPollerRequests(
170 PollerHeader pollerHeader,
171 Map<String, Object>[] pollerRequestChunks, boolean receiveRequest)
172 throws Exception {
173
174 String[] portletIds = pollerHeader.getPortletIds();
175
176 List<PollerRequest> pollerRequests = new ArrayList<PollerRequest>(
177 portletIds.length);
178
179 Set<String> receiveRequestPortletIds = null;
180
181 if (receiveRequest) {
182 receiveRequestPortletIds = new HashSet<String>(
183 (int)(pollerRequestChunks.length / 0.75) + 1);
184 }
185
186 for (int i = 1; i < pollerRequestChunks.length; i++) {
187 Map<String, Object> pollerRequestChunk = pollerRequestChunks[i];
188
189 String portletId = (String)pollerRequestChunk.get("portletId");
190 Map<String, String> parameterMap = parseData(pollerRequestChunk);
191 String chunkId = (String)pollerRequestChunk.get("chunkId");
192
193 try {
194 PollerRequest pollerRequest = createPollerRequest(
195 receiveRequest, pollerHeader, portletId, parameterMap,
196 chunkId);
197
198 pollerRequests.add(pollerRequest);
199
200 if (receiveRequest) {
201 receiveRequestPortletIds.add(portletId);
202 }
203 }
204 catch (Exception e) {
205 _log.error(e, e);
206 }
207 }
208
209 if (receiveRequest) {
210 for (String portletId : portletIds) {
211 if (receiveRequestPortletIds.contains(portletId)) {
212 continue;
213 }
214
215 try {
216 PollerRequest pollerRequest = createPollerRequest(
217 receiveRequest, pollerHeader, portletId);
218
219 pollerRequests.add(pollerRequest);
220 }
221 catch (Exception e) {
222 _log.error(e, e);
223 }
224 }
225 }
226
227 return pollerRequests;
228 }
229
230 protected JSONObject createPollerResponseHeader(PollerHeader pollerHeader)
231 throws SystemException {
232
233 if (pollerHeader == null) {
234 return null;
235 }
236
237 boolean suspendPolling = false;
238
239 if (pollerHeader.isStartPolling()) {
240 BrowserTrackerLocalServiceUtil.updateBrowserTracker(
241 pollerHeader.getUserId(), pollerHeader.getBrowserKey());
242 }
243 else {
244 BrowserTracker browserTracker =
245 BrowserTrackerLocalServiceUtil.getBrowserTracker(
246 pollerHeader.getUserId(), pollerHeader.getBrowserKey());
247
248 if (browserTracker.getBrowserKey() !=
249 pollerHeader.getBrowserKey()) {
250
251 suspendPolling = true;
252 }
253 }
254
255 JSONObject pollerResponseHeaderJSONObject =
256 JSONFactoryUtil.createJSONObject();
257
258 pollerResponseHeaderJSONObject.put("userId", pollerHeader.getUserId());
259 pollerResponseHeaderJSONObject.put(
260 "initialRequest", pollerHeader.isInitialRequest());
261 pollerResponseHeaderJSONObject.put("suspendPolling", suspendPolling);
262
263 return pollerResponseHeaderJSONObject;
264 }
265
266 protected void executePollerRequests(
267 PollerSession pollerSession, List<PollerRequest> pollerRequests) {
268
269 for (PollerRequest pollerRequest : pollerRequests) {
270 PollerRequestResponsePair pollerRequestResponsePair =
271 new PollerRequestResponsePair(pollerRequest);
272
273 String responseId = null;
274
275 if (pollerRequest.isReceiveRequest()) {
276 responseId = PortalUUIDUtil.generate();
277
278 PollerResponse pollerResponse = new DefaultPollerResponse(
279 pollerRequest.getPollerHeader(),
280 pollerRequest.getPortletId(),
281 pollerRequest.getChunkId());
282
283 pollerRequestResponsePair.setPollerResponse(pollerResponse);
284
285 if (!pollerSession.beginPortletProcessing(
286 pollerRequestResponsePair, responseId)) {
287
288 continue;
289 }
290 }
291
292 Message message = new Message();
293
294 message.setPayload(pollerRequestResponsePair);
295
296 if (pollerRequest.isReceiveRequest()) {
297 message.setResponseId(responseId);
298
299 message.setResponseDestinationName(
300 DestinationNames.POLLER_RESPONSE);
301 }
302
303 MessageBusUtil.sendMessage(DestinationNames.POLLER, message);
304 }
305 }
306
307 protected String fixPollerRequestString(String pollerRequestString) {
308 if (Validator.isNull(pollerRequestString)) {
309 return null;
310 }
311
312 return StringUtil.replace(
313 pollerRequestString,
314 new String[] {
315 StringPool.OPEN_CURLY_BRACE,
316 StringPool.CLOSE_CURLY_BRACE,
317 _ESCAPED_OPEN_CURLY_BRACE,
318 _ESCAPED_CLOSE_CURLY_BRACE
319 },
320 new String[] {
321 _OPEN_HASH_MAP_WRAPPER,
322 StringPool.DOUBLE_CLOSE_CURLY_BRACE,
323 StringPool.OPEN_CURLY_BRACE,
324 StringPool.CLOSE_CURLY_BRACE
325 });
326 }
327
328 protected String getPollerSessionId(PollerHeader pollerHeader) {
329 return String.valueOf(pollerHeader.getUserId());
330 }
331
332 protected long getUserId(long companyId, String userIdString) {
333 long userId = 0;
334
335 try {
336 Company company = CompanyLocalServiceUtil.getCompany(companyId);
337
338 userId = GetterUtil.getLong(
339 Encryptor.decrypt(company.getKeyObj(), userIdString));
340 }
341 catch (Exception e) {
342 _log.error(
343 "Invalid credentials for company id " + companyId +
344 " and user id " + userIdString);
345 }
346
347 return userId;
348 }
349
350 protected boolean isReceiveRequest(String path) {
351 if ((path != null) && path.endsWith(_PATH_RECEIVE)) {
352 return true;
353 }
354 else {
355 return false;
356 }
357 }
358
359 protected Map<String, String> parseData(
360 Map<String, Object> pollerRequestChunk)
361 throws Exception {
362
363 Map<String, Object> oldParameterMap =
364 (Map<String, Object>)pollerRequestChunk.get("data");
365
366 Map<String, String> newParameterMap = new HashMap<String, String>();
367
368 if (oldParameterMap == null) {
369 return newParameterMap;
370 }
371
372 for (Map.Entry<String, Object> entry : oldParameterMap.entrySet()) {
373 newParameterMap.put(
374 entry.getKey(), String.valueOf(entry.getValue()));
375 }
376
377 return newParameterMap;
378 }
379
380 protected PollerHeader parsePollerRequestHeader(
381 Map<String, Object>[] pollerRequestChunks) {
382
383 if ((pollerRequestChunks == null) || (pollerRequestChunks.length < 1)) {
384 return null;
385 }
386
387 Map<String, Object> pollerRequestChunk = pollerRequestChunks[0];
388
389 long companyId = GetterUtil.getLong(
390 String.valueOf(pollerRequestChunk.get("companyId")));
391 String userIdString = GetterUtil.getString(
392 String.valueOf(pollerRequestChunk.get("userId")));
393 long browserKey = GetterUtil.getLong(
394 String.valueOf(pollerRequestChunk.get("browserKey")));
395 String[] portletIds = StringUtil.split(
396 String.valueOf(pollerRequestChunk.get("portletIds")));
397 boolean initialRequest = GetterUtil.getBoolean(
398 String.valueOf(pollerRequestChunk.get("initialRequest")));
399 boolean startPolling = GetterUtil.getBoolean(
400 String.valueOf(pollerRequestChunk.get("startPolling")));
401
402 long userId = getUserId(companyId, userIdString);
403
404 if (userId == 0) {
405 return null;
406 }
407
408 return new PollerHeader(
409 companyId, userId, browserKey, portletIds, initialRequest,
410 startPolling);
411 }
412
413 protected Map<String, Object>[] parsePollerRequestParameters(
414 String pollerRequestString) {
415
416 String fixedPollerRequestString = fixPollerRequestString(
417 pollerRequestString);
418
419 return (Map<String, Object>[])JSONFactoryUtil.deserialize(
420 fixedPollerRequestString);
421 }
422
423 private static final String _ESCAPED_CLOSE_CURLY_BRACE =
424 "[$CLOSE_CURLY_BRACE$]";
425
426 private static final String _ESCAPED_OPEN_CURLY_BRACE =
427 "[$OPEN_CURLY_BRACE$]";
428
429 private static final String _OPEN_HASH_MAP_WRAPPER =
430 "{\"javaClass\":\"java.util.HashMap\",\"map\":{";
431
432 private static final String _PATH_RECEIVE = "/receive";
433
434 private static Log _log = LogFactoryUtil.getLog(
435 PollerRequestHandlerImpl.class);
436
437 private Map<String, PollerSession> _pollerSessions =
438 new HashMap<String, PollerSession>();
439
440 }