001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashMap;
038 import java.util.HashSet;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 public Channel clone(long companyId, long userId) {
062 return new ChannelImpl(companyId, userId);
063 }
064
065 public void confirmDelivery(Collection<String> notificationEventUuids)
066 throws ChannelException {
067
068 confirmDelivery(notificationEventUuids, false);
069 }
070
071 public void confirmDelivery(
072 Collection<String> notificationEventUuids, boolean archive)
073 throws ChannelException {
074
075 _reentrantLock.lock();
076
077 try {
078 for (String notificationEventUuid : notificationEventUuids) {
079 Map<String, NotificationEvent> unconfirmedNotificationEvents =
080 _getUnconfirmedNotificationEvents();
081
082 unconfirmedNotificationEvents.remove(notificationEventUuid);
083 }
084
085 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
086 if (archive) {
087 UserNotificationEventLocalServiceUtil.
088 updateUserNotificationEvents(
089 notificationEventUuids, archive);
090 }
091 else {
092 UserNotificationEventLocalServiceUtil.
093 deleteUserNotificationEvents(notificationEventUuids);
094 }
095 }
096 }
097 catch (Exception e) {
098 throw new ChannelException(
099 "Unable to confirm delivery for user " + getUserId() , e);
100 }
101 finally {
102 _reentrantLock.unlock();
103 }
104 }
105
106 public void confirmDelivery(String notificationEventUuid)
107 throws ChannelException {
108
109 confirmDelivery(notificationEventUuid, false);
110 }
111
112 public void confirmDelivery(String notificationEventUuid, boolean archive)
113 throws ChannelException {
114
115 _reentrantLock.lock();
116
117 try {
118 Map<String, NotificationEvent> unconfirmedNotificationEvents =
119 _getUnconfirmedNotificationEvents();
120
121 unconfirmedNotificationEvents.remove(notificationEventUuid);
122
123 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
124 if (archive) {
125 UserNotificationEventLocalServiceUtil.
126 updateUserNotificationEvent(
127 notificationEventUuid, archive);
128 }
129 else {
130 UserNotificationEventLocalServiceUtil.
131 deleteUserNotificationEvent(notificationEventUuid);
132 }
133 }
134 }
135 catch (Exception e) {
136 throw new ChannelException(
137 "Uanble to confirm delivery for " + notificationEventUuid , e);
138 }
139 finally {
140 _reentrantLock.unlock();
141 }
142 }
143
144 public void deleteUserNotificiationEvent(String notificationEventUuid)
145 throws ChannelException {
146
147 try {
148 UserNotificationEventLocalServiceUtil.
149 deleteUserNotificationEvent(notificationEventUuid);
150 }
151 catch (Exception e) {
152 throw new ChannelException(
153 "Uanble to delete event " + notificationEventUuid ,
154 e);
155 }
156 }
157
158 public void deleteUserNotificiationEvents(
159 Collection<String> notificationEventUuids)
160 throws ChannelException {
161
162 try {
163 UserNotificationEventLocalServiceUtil.
164 deleteUserNotificationEvents(notificationEventUuids);
165 }
166 catch (Exception e) {
167 throw new ChannelException(
168 "Uanble to delete events for user " + getUserId() , e);
169 }
170 }
171
172 public void flush() {
173 _reentrantLock.lock();
174
175 try {
176 if (_notificationEvents != null) {
177 _notificationEvents.clear();
178 }
179 }
180 finally {
181 _reentrantLock.unlock();
182 }
183 }
184
185 public void flush(long timestamp) {
186 _reentrantLock.lock();
187
188 try {
189 if (_notificationEvents == null) {
190 return;
191 }
192
193 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
194
195 while (itr.hasNext()) {
196 NotificationEvent notificationEvent = itr.next();
197
198 if (notificationEvent.getTimestamp() < timestamp) {
199 itr.remove();
200 }
201 }
202 }
203 finally {
204 _reentrantLock.unlock();
205 }
206 }
207
208 public List<NotificationEvent> getNotificationEvents(boolean flush)
209 throws ChannelException {
210
211 _reentrantLock.lock();
212
213 try {
214 return doGetNotificationEvents(flush);
215 }
216 catch (ChannelException ce) {
217 throw ce;
218 }
219 catch (Exception e) {
220 throw new ChannelException(e);
221 }
222 finally {
223 _reentrantLock.unlock();
224 }
225 }
226
227 public void init() throws ChannelException {
228 _reentrantLock.lock();
229
230 try {
231 doInit();
232 }
233 catch (SystemException se) {
234 throw new ChannelException(
235 "Unable to init channel " + getUserId(), se);
236 }
237 finally {
238 _reentrantLock.unlock();
239 }
240 }
241
242 public void removeTransientNotificationEvents(
243 Collection<NotificationEvent> notificationEvents) {
244
245 _reentrantLock.lock();
246
247 try {
248 if (_notificationEvents != null) {
249 _notificationEvents.removeAll(notificationEvents);
250 }
251 }
252 finally {
253 _reentrantLock.unlock();
254 }
255 }
256
257 public void removeTransientNotificationEventsByUuid(
258 Collection<String> notificationEventUuids) {
259
260 Set<String> notificationEventUuidsSet = new HashSet<String>(
261 notificationEventUuids);
262
263 _reentrantLock.lock();
264
265 try {
266 if (_notificationEvents == null) {
267 return;
268 }
269
270 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
271
272 while (itr.hasNext()) {
273 NotificationEvent notificationEvent = itr.next();
274
275 if (notificationEventUuidsSet.contains(
276 notificationEvent.getUuid())) {
277
278 itr.remove();
279 }
280 }
281 }
282 finally {
283 _reentrantLock.unlock();
284 }
285 }
286
287 public void sendNotificationEvent(NotificationEvent notificationEvent)
288 throws ChannelException {
289
290 _reentrantLock.lock();
291
292 try {
293 long currentTime = System.currentTimeMillis();
294
295 storeNotificationEvent(notificationEvent, currentTime);
296
297 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
298 notificationEvent.isDeliveryRequired()) {
299
300 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
301 getUserId(), notificationEvent);
302 }
303 }
304 catch (Exception e) {
305 throw new ChannelException("Unable to send event", e);
306 }
307 finally {
308 _reentrantLock.unlock();
309 }
310
311 notifyChannelListeners();
312 }
313
314 public void sendNotificationEvents(
315 Collection<NotificationEvent> notificationEvents)
316 throws ChannelException {
317
318 _reentrantLock.lock();
319
320 try {
321 long currentTime = System.currentTimeMillis();
322
323 List<NotificationEvent> persistedNotificationEvents =
324 new ArrayList<NotificationEvent>(notificationEvents.size());
325
326 for (NotificationEvent notificationEvent : notificationEvents) {
327 storeNotificationEvent(notificationEvent, currentTime);
328
329 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
330 notificationEvent.isDeliveryRequired()) {
331
332 persistedNotificationEvents.add(notificationEvent);
333 }
334 }
335
336 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
337 !persistedNotificationEvents.isEmpty()) {
338
339 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
340 getUserId(), persistedNotificationEvents);
341 }
342 }
343 catch (Exception e) {
344 throw new ChannelException("Unable to send event", e);
345 }
346 finally {
347 _reentrantLock.unlock();
348 }
349
350 notifyChannelListeners();
351 }
352
353 @Override
354 protected void doCleanUp() throws Exception {
355 _reentrantLock.lock();
356
357 try {
358 long currentTime = System.currentTimeMillis();
359
360 TreeSet<NotificationEvent> notificationEvents =
361 _getNotificationEvents();
362
363 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
364
365 while (itr1.hasNext()) {
366 NotificationEvent notificationEvent = itr1.next();
367
368 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
369 itr1.remove();
370 }
371 }
372
373 Map<String, NotificationEvent> unconfirmedNotificationEvents =
374 _getUnconfirmedNotificationEvents();
375
376 List<String> invalidNotificationEventUuids = new ArrayList<String>(
377 unconfirmedNotificationEvents.size());
378
379 Set<Map.Entry<String, NotificationEvent>>
380 unconfirmedNotificationEventsSet =
381 unconfirmedNotificationEvents.entrySet();
382
383 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
384 unconfirmedNotificationEventsSet.iterator();
385
386 while (itr2.hasNext()) {
387 Map.Entry<String, NotificationEvent> entry = itr2.next();
388
389 NotificationEvent notificationEvent = entry.getValue();
390
391 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
392 invalidNotificationEventUuids.add(entry.getKey());
393
394 itr2.remove();
395 }
396 }
397
398 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
399 !invalidNotificationEventUuids.isEmpty()) {
400
401 UserNotificationEventLocalServiceUtil.
402 deleteUserNotificationEvents(invalidNotificationEventUuids);
403 }
404 }
405 catch (Exception e) {
406 throw new ChannelException(
407 "Unable to clean up channel " + getUserId(), e);
408 }
409 finally {
410 _reentrantLock.unlock();
411 }
412 }
413
414 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
415 throws Exception {
416
417 long currentTime = System.currentTimeMillis();
418
419 TreeSet<NotificationEvent> notificationEventsSet =
420 _getNotificationEvents();
421
422 Map<String, NotificationEvent> unconfirmedNotificationEvents =
423 _getUnconfirmedNotificationEvents();
424
425 List<NotificationEvent> notificationEvents =
426 new ArrayList<NotificationEvent>(
427 notificationEventsSet.size() +
428 unconfirmedNotificationEvents.size());
429
430 for (NotificationEvent notificationEvent : notificationEventsSet) {
431 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
432 break;
433 }
434 else {
435 notificationEvents.add(notificationEvent);
436 }
437 }
438
439 if (flush) {
440 notificationEventsSet.clear();
441 }
442 else if (notificationEventsSet.size() != notificationEvents.size()) {
443 notificationEventsSet.retainAll(notificationEvents);
444 }
445
446 List<String> invalidNotificationEventUuids = new ArrayList<String>(
447 unconfirmedNotificationEvents.size());
448
449 Set<Map.Entry<String, NotificationEvent>>
450 unconfirmedNotificationEventsSet =
451 unconfirmedNotificationEvents.entrySet();
452
453 Iterator<Map.Entry<String, NotificationEvent>> itr =
454 unconfirmedNotificationEventsSet.iterator();
455
456 while (itr.hasNext()) {
457 Map.Entry<String, NotificationEvent> entry = itr.next();
458
459 NotificationEvent notificationEvent = entry.getValue();
460
461 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
462 !notificationEvent.isArchived()) {
463
464 invalidNotificationEventUuids.add(notificationEvent.getUuid());
465
466 itr.remove();
467 }
468 else {
469 notificationEvents.add(entry.getValue());
470 }
471 }
472
473 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
474 !invalidNotificationEventUuids.isEmpty()) {
475
476 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
477 invalidNotificationEventUuids);
478 }
479
480 return notificationEvents;
481 }
482
483 protected void doInit() throws SystemException {
484 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
485 return;
486 }
487
488 List<UserNotificationEvent> userNotificationEvents =
489 UserNotificationEventLocalServiceUtil.getUserNotificationEvents(
490 getUserId(), false);
491
492 Map<String, NotificationEvent> unconfirmedNotificationEvents =
493 _getUnconfirmedNotificationEvents();
494
495 List<String> invalidNotificationEventUuids = new ArrayList<String>(
496 unconfirmedNotificationEvents.size());
497
498 long currentTime = System.currentTimeMillis();
499
500 for (UserNotificationEvent persistedNotificationEvent :
501 userNotificationEvents) {
502
503 try {
504 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
505 persistedNotificationEvent.getPayload());
506
507 NotificationEvent notificationEvent =
508 NotificationEventFactoryUtil.createNotificationEvent(
509 persistedNotificationEvent.getTimestamp(),
510 persistedNotificationEvent.getType(),
511 payloadJSONObject);
512
513 notificationEvent.setDeliveryRequired(
514 persistedNotificationEvent.getDeliverBy());
515
516 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
517
518 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
519 invalidNotificationEventUuids.add(
520 notificationEvent.getUuid());
521 }
522 else {
523 unconfirmedNotificationEvents.put(
524 notificationEvent.getUuid(), notificationEvent);
525 }
526 }
527 catch (JSONException jsone) {
528 _log.error(jsone, jsone);
529
530 invalidNotificationEventUuids.add(
531 persistedNotificationEvent.getUuid());
532 }
533 }
534
535 if (!invalidNotificationEventUuids.isEmpty()) {
536 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
537 invalidNotificationEventUuids);
538 }
539 }
540
541 protected boolean isRemoveNotificationEvent(
542 NotificationEvent notificationEvent, long currentTime) {
543
544 if ((notificationEvent.getDeliverBy() != 0) &&
545 (notificationEvent.getDeliverBy() <= currentTime)) {
546
547 return true;
548 }
549 else {
550 return false;
551 }
552 }
553
554 protected void storeNotificationEvent(
555 NotificationEvent notificationEvent, long currentTime) {
556
557 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
558 return;
559 }
560
561 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
562 notificationEvent.isDeliveryRequired()) {
563
564 Map<String, NotificationEvent> unconfirmedNotificationEvents =
565 _getUnconfirmedNotificationEvents();
566
567 unconfirmedNotificationEvents.put(
568 notificationEvent.getUuid(), notificationEvent);
569 }
570 else {
571 TreeSet<NotificationEvent> notificationEvents =
572 _getNotificationEvents();
573
574 notificationEvents.add(notificationEvent);
575
576 if (notificationEvents.size() >
577 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
578
579 NotificationEvent firstNotificationEvent =
580 notificationEvents.first();
581
582 notificationEvents.remove(firstNotificationEvent);
583 }
584 }
585 }
586
587 private TreeSet<NotificationEvent> _getNotificationEvents() {
588 if (_notificationEvents == null) {
589 _notificationEvents = new TreeSet<NotificationEvent>(_comparator);
590 }
591
592 return _notificationEvents;
593 }
594
595 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
596 if (_unconfirmedNotificationEvents == null) {
597 _unconfirmedNotificationEvents =
598 new HashMap<String, NotificationEvent>();
599 }
600
601 return _unconfirmedNotificationEvents;
602 }
603
604 private static Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
605
606 private static Comparator<NotificationEvent> _comparator =
607 new NotificationEventComparator();
608
609 private TreeSet<NotificationEvent> _notificationEvents;
610 private ReentrantLock _reentrantLock = new ReentrantLock();
611 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
612
613 }