001
014
015 package com.liferay.portal.poller;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONFactoryUtil;
019 import com.liferay.portal.kernel.json.JSONObject;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.DestinationNames;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.messaging.MessageBusUtil;
025 import com.liferay.portal.kernel.messaging.MessageListener;
026 import com.liferay.portal.kernel.poller.DefaultPollerResponse;
027 import com.liferay.portal.kernel.poller.PollerHeader;
028 import com.liferay.portal.kernel.poller.PollerProcessor;
029 import com.liferay.portal.kernel.poller.PollerRequest;
030 import com.liferay.portal.kernel.poller.PollerResponse;
031 import com.liferay.portal.kernel.util.GetterUtil;
032 import com.liferay.portal.kernel.util.StringPool;
033 import com.liferay.portal.kernel.util.StringUtil;
034 import com.liferay.portal.kernel.util.Validator;
035 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
036 import com.liferay.portal.model.BrowserTracker;
037 import com.liferay.portal.model.Company;
038 import com.liferay.portal.service.BrowserTrackerLocalServiceUtil;
039 import com.liferay.portal.service.CompanyLocalServiceUtil;
040 import com.liferay.util.Encryptor;
041
042 import java.util.ArrayList;
043 import java.util.HashMap;
044 import java.util.HashSet;
045 import java.util.List;
046 import java.util.Map;
047 import java.util.Set;
048
049
054 public class PollerRequestHandlerImpl
055 implements PollerRequestHandler, MessageListener {
056
057 public PollerHeader getPollerHeader(String pollerRequestString) {
058 if (Validator.isNull(pollerRequestString)) {
059 return null;
060 }
061
062 Map<String, Object>[] pollerRequestChunks =
063 parsePollerRequestParameters(pollerRequestString);
064
065 return parsePollerRequestHeader(pollerRequestChunks);
066 }
067
068 public JSONObject processRequest(String path, String pollerRequestString)
069 throws Exception {
070
071 if (Validator.isNull(pollerRequestString)) {
072 return null;
073 }
074
075 Map<String, Object>[] pollerRequestChunks =
076 parsePollerRequestParameters(pollerRequestString);
077
078 PollerHeader pollerHeader = parsePollerRequestHeader(
079 pollerRequestChunks);
080
081 if (pollerHeader == null) {
082 return null;
083 }
084
085 boolean receiveRequest = isReceiveRequest(path);
086
087 String pollerSessionId = getPollerSessionId(pollerHeader);
088
089 PollerSession pollerSession = null;
090
091 synchronized (_pollerSessions) {
092 pollerSession = _pollerSessions.get(pollerSessionId);
093
094 if ((pollerSession == null) && receiveRequest) {
095 pollerSession = new PollerSession(pollerSessionId);
096
097 _pollerSessions.put(pollerSessionId, pollerSession);
098 }
099 }
100
101 List<PollerRequest> pollerRequests = createPollerRequests(
102 pollerHeader, pollerRequestChunks, receiveRequest);
103
104 executePollerRequests(pollerSession, pollerRequests);
105
106 if (receiveRequest) {
107 return createPollerResponseHeader(pollerHeader);
108 }
109 else {
110 return null;
111 }
112 }
113
114 public void receive(Message message) {
115 Object messagePayload = message.getPayload();
116
117 if (!(messagePayload instanceof PollerResponse)) {
118 return;
119 }
120
121 PollerResponse pollerResponse = (PollerResponse) messagePayload;
122
123 PollerHeader pollerHeader = pollerResponse.getPollerHeader();
124
125 String pollerSessionId = getPollerSessionId(pollerHeader);
126
127 synchronized (_pollerSessions) {
128 PollerSession pollerSession = _pollerSessions.get(
129 pollerSessionId);
130
131 if ((pollerSession != null) &&
132 pollerSession.completePortletProcessing(
133 pollerResponse.getPortletId(), message.getResponseId())) {
134
135 _pollerSessions.remove(pollerSessionId);
136 }
137 }
138 }
139
140 protected PollerRequest createPollerRequest(
141 boolean receiveRequest, PollerHeader pollerHeader, String portletId)
142 throws Exception {
143
144 return createPollerRequest(
145 receiveRequest, pollerHeader, portletId,
146 new HashMap<String, String>(), null);
147 }
148
149 protected PollerRequest createPollerRequest(
150 boolean receiveRequest, PollerHeader pollerHeader, String portletId,
151 Map<String, String> parameterMap, String chunkId)
152 throws Exception {
153
154 PollerProcessor pollerProcessor =
155 PollerProcessorUtil.getPollerProcessor(portletId);
156
157 if (pollerProcessor == null) {
158 if (_log.isWarnEnabled()) {
159 _log.warn(
160 "Poller processor not found for portlet " + portletId);
161 }
162
163 return null;
164 }
165
166 return new PollerRequest(
167 pollerHeader, portletId, parameterMap, chunkId, receiveRequest);
168 }
169
170 protected List<PollerRequest> createPollerRequests(
171 PollerHeader pollerHeader,
172 Map<String, Object>[] pollerRequestChunks, boolean receiveRequest)
173 throws Exception {
174
175 String[] portletIds = pollerHeader.getPortletIds();
176
177 List<PollerRequest> pollerRequests = new ArrayList<PollerRequest>(
178 portletIds.length);
179
180 Set<String> receiveRequestPortletIds = null;
181
182 if (receiveRequest) {
183 receiveRequestPortletIds = new HashSet<String>(
184 (int)(pollerRequestChunks.length / 0.75) + 1);
185 }
186
187 for (int i = 1; i < pollerRequestChunks.length; i++) {
188 Map<String, Object> pollerRequestChunk = pollerRequestChunks[i];
189
190 String portletId = (String)pollerRequestChunk.get("portletId");
191 Map<String, String> parameterMap = parseData(pollerRequestChunk);
192 String chunkId = (String)pollerRequestChunk.get("chunkId");
193
194 try {
195 PollerRequest pollerRequest = createPollerRequest(
196 receiveRequest, pollerHeader, portletId, parameterMap,
197 chunkId);
198
199 pollerRequests.add(pollerRequest);
200
201 if (receiveRequest) {
202 receiveRequestPortletIds.add(portletId);
203 }
204 }
205 catch (Exception e) {
206 _log.error(e, e);
207 }
208 }
209
210 if (receiveRequest) {
211 for (String portletId : portletIds) {
212 if (receiveRequestPortletIds.contains(portletId)) {
213 continue;
214 }
215
216 try {
217 PollerRequest pollerRequest = createPollerRequest(
218 receiveRequest, pollerHeader, portletId);
219
220 pollerRequests.add(pollerRequest);
221 }
222 catch (Exception e) {
223 _log.error(e, e);
224 }
225 }
226 }
227
228 return pollerRequests;
229 }
230
231 protected JSONObject createPollerResponseHeader(PollerHeader pollerHeader)
232 throws SystemException {
233
234 if (pollerHeader == null) {
235 return null;
236 }
237
238 boolean suspendPolling = false;
239
240 if (pollerHeader.isStartPolling()) {
241 BrowserTrackerLocalServiceUtil.updateBrowserTracker(
242 pollerHeader.getUserId(), pollerHeader.getBrowserKey());
243 }
244 else {
245 BrowserTracker browserTracker =
246 BrowserTrackerLocalServiceUtil.getBrowserTracker(
247 pollerHeader.getUserId(), pollerHeader.getBrowserKey());
248
249 if (browserTracker.getBrowserKey() !=
250 pollerHeader.getBrowserKey()) {
251
252 suspendPolling = true;
253 }
254 }
255
256 JSONObject pollerResponseHeaderJSONObject =
257 JSONFactoryUtil.createJSONObject();
258
259 pollerResponseHeaderJSONObject.put(
260 "userId", pollerHeader.getUserId());
261 pollerResponseHeaderJSONObject.put(
262 "initialRequest", pollerHeader.isInitialRequest());
263 pollerResponseHeaderJSONObject.put("suspendPolling", suspendPolling);
264
265 return pollerResponseHeaderJSONObject;
266 }
267
268 protected void executePollerRequests(
269 PollerSession pollerSession, List<PollerRequest> pollerRequests) {
270
271 for (PollerRequest pollerRequest : pollerRequests) {
272 PollerRequestResponsePair pollerRequestResponsePair =
273 new PollerRequestResponsePair(pollerRequest);
274
275 String responseId = null;
276
277 if (pollerRequest.isReceiveRequest()) {
278 responseId = PortalUUIDUtil.generate();
279
280 PollerResponse pollerResponse = new DefaultPollerResponse(
281 pollerRequest.getPollerHeader(),
282 pollerRequest.getPortletId(),
283 pollerRequest.getChunkId());
284
285 pollerRequestResponsePair.setPollerResponse(pollerResponse);
286
287 if (!pollerSession.beginPortletProcessing(
288 pollerRequestResponsePair, responseId)) {
289
290 continue;
291 }
292 }
293
294 Message message = new Message();
295
296 message.setPayload(pollerRequestResponsePair);
297
298 if (pollerRequest.isReceiveRequest()) {
299 message.setResponseId(responseId);
300
301 message.setResponseDestinationName(
302 DestinationNames.POLLER_RESPONSE);
303 }
304
305 MessageBusUtil.sendMessage(DestinationNames.POLLER, message);
306 }
307 }
308
309 protected String fixPollerRequestString(String pollerRequestString) {
310 if (Validator.isNull(pollerRequestString)) {
311 return null;
312 }
313
314 return StringUtil.replace(
315 pollerRequestString,
316 new String[] {
317 StringPool.OPEN_CURLY_BRACE,
318 StringPool.CLOSE_CURLY_BRACE,
319 _ESCAPED_OPEN_CURLY_BRACE,
320 _ESCAPED_CLOSE_CURLY_BRACE
321 },
322 new String[] {
323 _OPEN_HASH_MAP_WRAPPER,
324 StringPool.DOUBLE_CLOSE_CURLY_BRACE,
325 StringPool.OPEN_CURLY_BRACE,
326 StringPool.CLOSE_CURLY_BRACE
327 });
328 }
329
330 protected String getPollerSessionId(PollerHeader pollerHeader) {
331 return String.valueOf(pollerHeader.getUserId());
332 }
333
334 protected long getUserId(long companyId, String userIdString) {
335 long userId = 0;
336
337 try {
338 Company company = CompanyLocalServiceUtil.getCompany(companyId);
339
340 userId = GetterUtil.getLong(
341 Encryptor.decrypt(company.getKeyObj(), userIdString));
342 }
343 catch (Exception e) {
344 _log.error(
345 "Invalid credentials for company id " + companyId +
346 " and user id " + userIdString);
347 }
348
349 return userId;
350 }
351
352 protected boolean isReceiveRequest(String path) {
353 if ((path != null) && path.endsWith(_PATH_RECEIVE)) {
354 return true;
355 }
356 else {
357 return false;
358 }
359 }
360
361 protected Map<String, String> parseData(
362 Map<String, Object> pollerRequestChunk)
363 throws Exception {
364
365 Map<String, Object> oldParameterMap =
366 (Map<String, Object>)pollerRequestChunk.get("data");
367
368 Map<String, String> newParameterMap = new HashMap<String, String>();
369
370 if (oldParameterMap == null) {
371 return newParameterMap;
372 }
373
374 for (Map.Entry<String, Object> entry : oldParameterMap.entrySet()) {
375 newParameterMap.put(
376 entry.getKey(), String.valueOf(entry.getValue()));
377 }
378
379 return newParameterMap;
380 }
381
382 protected PollerHeader parsePollerRequestHeader(
383 Map<String, Object>[] pollerRequestChunks) {
384
385 if ((pollerRequestChunks == null) || (pollerRequestChunks.length < 1)) {
386 return null;
387 }
388
389 Map<String, Object> pollerRequestChunk = pollerRequestChunks[0];
390
391 long companyId = GetterUtil.getLong(
392 String.valueOf(pollerRequestChunk.get("companyId")));
393 String userIdString = GetterUtil.getString(
394 String.valueOf(pollerRequestChunk.get("userId")));
395 long browserKey = GetterUtil.getLong(
396 String.valueOf(pollerRequestChunk.get("browserKey")));
397 String[] portletIds = StringUtil.split(
398 String.valueOf(pollerRequestChunk.get("portletIds")));
399 boolean initialRequest = GetterUtil.getBoolean(
400 String.valueOf(pollerRequestChunk.get("initialRequest")));
401 boolean startPolling = GetterUtil.getBoolean(
402 String.valueOf(pollerRequestChunk.get("startPolling")));
403
404 long userId = getUserId(companyId, userIdString);
405
406 if (userId == 0) {
407 return null;
408 }
409
410 return new PollerHeader(
411 companyId, userId, browserKey, portletIds, initialRequest,
412 startPolling);
413 }
414
415 protected Map<String, Object>[] parsePollerRequestParameters(
416 String pollerRequestString) {
417
418 String fixedPollerRequestString = fixPollerRequestString(
419 pollerRequestString);
420
421 return (Map<String, Object>[])JSONFactoryUtil.deserialize(
422 fixedPollerRequestString);
423 }
424
425 private static final String _ESCAPED_CLOSE_CURLY_BRACE =
426 "[$CLOSE_CURLY_BRACE$]";
427
428 private static final String _ESCAPED_OPEN_CURLY_BRACE =
429 "[$OPEN_CURLY_BRACE$]";
430
431 private static final String _OPEN_HASH_MAP_WRAPPER =
432 "{\"javaClass\":\"java.util.HashMap\",\"map\":{";
433
434 private static final String _PATH_RECEIVE = "/receive";
435
436 private static Log _log = LogFactoryUtil.getLog(
437 PollerRequestHandlerImpl.class);
438
439 private Map<String, PollerSession> _pollerSessions =
440 new HashMap<String, PollerSession>();
441
442 }