001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashSet;
038 import java.util.Iterator;
039 import java.util.LinkedHashMap;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 @Override
062 public Channel clone(long companyId, long userId) {
063 return new ChannelImpl(companyId, userId);
064 }
065
066 @Override
067 public void confirmDelivery(Collection<String> notificationEventUuids)
068 throws ChannelException {
069
070 confirmDelivery(notificationEventUuids, false);
071 }
072
073 @Override
074 public void confirmDelivery(
075 Collection<String> notificationEventUuids, boolean archive)
076 throws ChannelException {
077
078 _reentrantLock.lock();
079
080 try {
081 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
082 if (archive) {
083 UserNotificationEventLocalServiceUtil.
084 updateUserNotificationEvents(
085 notificationEventUuids, getCompanyId(), archive);
086 }
087 else {
088 UserNotificationEventLocalServiceUtil.
089 deleteUserNotificationEvents(
090 notificationEventUuids, getCompanyId());
091 }
092 }
093
094 for (String notificationEventUuid : notificationEventUuids) {
095 Map<String, NotificationEvent> unconfirmedNotificationEvents =
096 _getUnconfirmedNotificationEvents();
097
098 unconfirmedNotificationEvents.remove(notificationEventUuid);
099 }
100 }
101 catch (Exception e) {
102 throw new ChannelException(
103 "Unable to confirm delivery for user " + getUserId(), e);
104 }
105 finally {
106 _reentrantLock.unlock();
107 }
108 }
109
110 @Override
111 public void confirmDelivery(String notificationEventUuid)
112 throws ChannelException {
113
114 confirmDelivery(notificationEventUuid, false);
115 }
116
117 @Override
118 public void confirmDelivery(String notificationEventUuid, boolean archive)
119 throws ChannelException {
120
121 _reentrantLock.lock();
122
123 try {
124 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
125 if (archive) {
126 UserNotificationEventLocalServiceUtil.
127 updateUserNotificationEvent(
128 notificationEventUuid, getCompanyId(), archive);
129 }
130 else {
131 UserNotificationEventLocalServiceUtil.
132 deleteUserNotificationEvent(
133 notificationEventUuid, getCompanyId());
134 }
135 }
136
137 Map<String, NotificationEvent> unconfirmedNotificationEvents =
138 _getUnconfirmedNotificationEvents();
139
140 unconfirmedNotificationEvents.remove(notificationEventUuid);
141 }
142 catch (Exception e) {
143 throw new ChannelException(
144 "Unable to confirm delivery for " + notificationEventUuid, e);
145 }
146 finally {
147 _reentrantLock.unlock();
148 }
149 }
150
151 @Override
152 public void deleteUserNotificiationEvent(String notificationEventUuid)
153 throws ChannelException {
154
155 _reentrantLock.lock();
156
157 try {
158 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvent(
159 notificationEventUuid, getCompanyId());
160
161 Map<String, NotificationEvent> unconfirmedNotificationEvents =
162 _getUnconfirmedNotificationEvents();
163
164 unconfirmedNotificationEvents.remove(notificationEventUuid);
165 }
166 catch (Exception e) {
167 throw new ChannelException(
168 "Unable to delete event " + notificationEventUuid, e);
169 }
170 finally {
171 _reentrantLock.unlock();
172 }
173 }
174
175 @Override
176 public void deleteUserNotificiationEvents(
177 Collection<String> notificationEventUuids)
178 throws ChannelException {
179
180 _reentrantLock.lock();
181
182 try {
183 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
184 notificationEventUuids, getCompanyId());
185
186 for (String notificationEventUuid : notificationEventUuids) {
187 Map<String, NotificationEvent> unconfirmedNotificationEvents =
188 _getUnconfirmedNotificationEvents();
189
190 unconfirmedNotificationEvents.remove(notificationEventUuid);
191 }
192 }
193 catch (Exception e) {
194 throw new ChannelException(
195 "Unable to delete events for user " + getUserId(), e);
196 }
197 finally {
198 _reentrantLock.unlock();
199 }
200 }
201
202 @Override
203 public void flush() {
204 _reentrantLock.lock();
205
206 try {
207 if (_notificationEvents != null) {
208 _notificationEvents.clear();
209 }
210 }
211 finally {
212 _reentrantLock.unlock();
213 }
214 }
215
216 @Override
217 public void flush(long timestamp) {
218 _reentrantLock.lock();
219
220 try {
221 if (_notificationEvents == null) {
222 return;
223 }
224
225 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
226
227 while (itr.hasNext()) {
228 NotificationEvent notificationEvent = itr.next();
229
230 if (notificationEvent.getTimestamp() < timestamp) {
231 itr.remove();
232 }
233 }
234 }
235 finally {
236 _reentrantLock.unlock();
237 }
238 }
239
240 @Override
241 public List<NotificationEvent> getNotificationEvents(boolean flush)
242 throws ChannelException {
243
244 _reentrantLock.lock();
245
246 try {
247 return doGetNotificationEvents(flush);
248 }
249 catch (ChannelException ce) {
250 throw ce;
251 }
252 catch (Exception e) {
253 throw new ChannelException(e);
254 }
255 finally {
256 _reentrantLock.unlock();
257 }
258 }
259
260 @Override
261 public void init() throws ChannelException {
262 _reentrantLock.lock();
263
264 try {
265 doInit();
266 }
267 catch (SystemException se) {
268 throw new ChannelException(
269 "Unable to init channel " + getUserId(), se);
270 }
271 finally {
272 _reentrantLock.unlock();
273 }
274 }
275
276 @Override
277 public void removeTransientNotificationEvents(
278 Collection<NotificationEvent> notificationEvents) {
279
280 _reentrantLock.lock();
281
282 try {
283 if (_notificationEvents != null) {
284 _notificationEvents.removeAll(notificationEvents);
285 }
286 }
287 finally {
288 _reentrantLock.unlock();
289 }
290 }
291
292 @Override
293 public void removeTransientNotificationEventsByUuid(
294 Collection<String> notificationEventUuids) {
295
296 Set<String> notificationEventUuidsSet = new HashSet<>(
297 notificationEventUuids);
298
299 _reentrantLock.lock();
300
301 try {
302 if (_notificationEvents == null) {
303 return;
304 }
305
306 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
307
308 while (itr.hasNext()) {
309 NotificationEvent notificationEvent = itr.next();
310
311 if (notificationEventUuidsSet.contains(
312 notificationEvent.getUuid())) {
313
314 itr.remove();
315 }
316 }
317 }
318 finally {
319 _reentrantLock.unlock();
320 }
321 }
322
323 @Override
324 public void sendNotificationEvent(NotificationEvent notificationEvent)
325 throws ChannelException {
326
327 _reentrantLock.lock();
328
329 try {
330 long currentTime = System.currentTimeMillis();
331
332 doStoreNotificationEvent(notificationEvent, currentTime);
333
334 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
335 notificationEvent.isDeliveryRequired()) {
336
337 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
338 getUserId(), notificationEvent);
339 }
340 }
341 catch (Exception e) {
342 throw new ChannelException("Unable to send event", e);
343 }
344 finally {
345 _reentrantLock.unlock();
346 }
347
348 notifyChannelListeners();
349 }
350
351 @Override
352 public void sendNotificationEvents(
353 Collection<NotificationEvent> notificationEvents)
354 throws ChannelException {
355
356 _reentrantLock.lock();
357
358 try {
359 long currentTime = System.currentTimeMillis();
360
361 List<NotificationEvent> persistedNotificationEvents =
362 new ArrayList<>(notificationEvents.size());
363
364 for (NotificationEvent notificationEvent : notificationEvents) {
365 doStoreNotificationEvent(notificationEvent, currentTime);
366
367 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
368 notificationEvent.isDeliveryRequired()) {
369
370 persistedNotificationEvents.add(notificationEvent);
371 }
372 }
373
374 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
375 !persistedNotificationEvents.isEmpty()) {
376
377 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
378 getUserId(), persistedNotificationEvents);
379 }
380 }
381 catch (Exception e) {
382 throw new ChannelException("Unable to send event", e);
383 }
384 finally {
385 _reentrantLock.unlock();
386 }
387
388 notifyChannelListeners();
389 }
390
391 @Override
392 public void storeNotificationEvent(
393 NotificationEvent notificationEvent, long currentTime) {
394
395 _reentrantLock.lock();
396
397 try {
398 doStoreNotificationEvent(notificationEvent, currentTime);
399 }
400 finally {
401 _reentrantLock.unlock();
402 }
403 }
404
405 @Override
406 protected void doCleanUp() throws Exception {
407 _reentrantLock.lock();
408
409 try {
410 long currentTime = System.currentTimeMillis();
411
412 TreeSet<NotificationEvent> notificationEvents =
413 _getNotificationEvents();
414
415 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
416
417 while (itr1.hasNext()) {
418 NotificationEvent notificationEvent = itr1.next();
419
420 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
421 itr1.remove();
422 }
423 }
424
425 Map<String, NotificationEvent> unconfirmedNotificationEvents =
426 _getUnconfirmedNotificationEvents();
427
428 List<String> invalidNotificationEventUuids = new ArrayList<>(
429 unconfirmedNotificationEvents.size());
430
431 Set<Map.Entry<String, NotificationEvent>>
432 unconfirmedNotificationEventsSet =
433 unconfirmedNotificationEvents.entrySet();
434
435 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
436 unconfirmedNotificationEventsSet.iterator();
437
438 while (itr2.hasNext()) {
439 Map.Entry<String, NotificationEvent> entry = itr2.next();
440
441 NotificationEvent notificationEvent = entry.getValue();
442
443 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
444 invalidNotificationEventUuids.add(entry.getKey());
445
446 itr2.remove();
447 }
448 }
449
450 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
451 !invalidNotificationEventUuids.isEmpty()) {
452
453 UserNotificationEventLocalServiceUtil.
454 deleteUserNotificationEvents(
455 invalidNotificationEventUuids, getCompanyId());
456 }
457 }
458 catch (Exception e) {
459 throw new ChannelException(
460 "Unable to clean up channel " + getUserId(), e);
461 }
462 finally {
463 _reentrantLock.unlock();
464 }
465 }
466
467 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
468 throws Exception {
469
470 long currentTime = System.currentTimeMillis();
471
472 TreeSet<NotificationEvent> notificationEventsSet =
473 _getNotificationEvents();
474
475 Map<String, NotificationEvent> unconfirmedNotificationEvents =
476 _getUnconfirmedNotificationEvents();
477
478 List<NotificationEvent> notificationEvents =
479 new ArrayList<NotificationEvent>(
480 notificationEventsSet.size() +
481 unconfirmedNotificationEvents.size());
482
483 for (NotificationEvent notificationEvent : notificationEventsSet) {
484 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
485 break;
486 }
487 else {
488 notificationEvents.add(notificationEvent);
489 }
490 }
491
492 if (flush) {
493 notificationEventsSet.clear();
494 }
495 else if (notificationEventsSet.size() != notificationEvents.size()) {
496 notificationEventsSet.retainAll(notificationEvents);
497 }
498
499 List<String> invalidNotificationEventUuids = new ArrayList<>(
500 unconfirmedNotificationEvents.size());
501
502 Set<Map.Entry<String, NotificationEvent>>
503 unconfirmedNotificationEventsSet =
504 unconfirmedNotificationEvents.entrySet();
505
506 Iterator<Map.Entry<String, NotificationEvent>> itr =
507 unconfirmedNotificationEventsSet.iterator();
508
509 while (itr.hasNext()) {
510 Map.Entry<String, NotificationEvent> entry = itr.next();
511
512 NotificationEvent notificationEvent = entry.getValue();
513
514 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
515 !notificationEvent.isArchived()) {
516
517 invalidNotificationEventUuids.add(notificationEvent.getUuid());
518
519 itr.remove();
520 }
521 else {
522 notificationEvents.add(entry.getValue());
523 }
524 }
525
526 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
527 !invalidNotificationEventUuids.isEmpty()) {
528
529 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
530 invalidNotificationEventUuids, getCompanyId());
531 }
532
533 return notificationEvents;
534 }
535
536 protected void doInit() {
537 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
538 return;
539 }
540
541 List<UserNotificationEvent> userNotificationEvents =
542 UserNotificationEventLocalServiceUtil.
543 getDeliveredUserNotificationEvents(getUserId(), false);
544
545 Map<String, NotificationEvent> unconfirmedNotificationEvents =
546 _getUnconfirmedNotificationEvents();
547
548 List<String> invalidNotificationEventUuids = new ArrayList<>(
549 unconfirmedNotificationEvents.size());
550
551 long currentTime = System.currentTimeMillis();
552
553 for (UserNotificationEvent persistedNotificationEvent :
554 userNotificationEvents) {
555
556 try {
557 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
558 persistedNotificationEvent.getPayload());
559
560 NotificationEvent notificationEvent =
561 NotificationEventFactoryUtil.createNotificationEvent(
562 persistedNotificationEvent.getTimestamp(),
563 persistedNotificationEvent.getType(),
564 payloadJSONObject);
565
566 notificationEvent.setDeliveryRequired(
567 persistedNotificationEvent.getDeliverBy());
568
569 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
570
571 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
572 invalidNotificationEventUuids.add(
573 notificationEvent.getUuid());
574 }
575 else {
576 unconfirmedNotificationEvents.put(
577 notificationEvent.getUuid(), notificationEvent);
578 }
579 }
580 catch (JSONException jsone) {
581 _log.error(jsone, jsone);
582
583 invalidNotificationEventUuids.add(
584 persistedNotificationEvent.getUuid());
585 }
586 }
587
588 if (!invalidNotificationEventUuids.isEmpty()) {
589 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
590 invalidNotificationEventUuids, getCompanyId());
591 }
592 }
593
594 protected void doStoreNotificationEvent(
595 NotificationEvent notificationEvent, long currentTime) {
596
597 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
598 return;
599 }
600
601 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
602 notificationEvent.isDeliveryRequired()) {
603
604 Map<String, NotificationEvent> unconfirmedNotificationEvents =
605 _getUnconfirmedNotificationEvents();
606
607 unconfirmedNotificationEvents.put(
608 notificationEvent.getUuid(), notificationEvent);
609 }
610 else {
611 TreeSet<NotificationEvent> notificationEvents =
612 _getNotificationEvents();
613
614 notificationEvents.add(notificationEvent);
615
616 if (notificationEvents.size() >
617 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
618
619 NotificationEvent firstNotificationEvent =
620 notificationEvents.first();
621
622 notificationEvents.remove(firstNotificationEvent);
623 }
624 }
625 }
626
627 protected boolean isRemoveNotificationEvent(
628 NotificationEvent notificationEvent, long currentTime) {
629
630 if ((notificationEvent.getDeliverBy() != 0) &&
631 (notificationEvent.getDeliverBy() <= currentTime)) {
632
633 return true;
634 }
635 else {
636 return false;
637 }
638 }
639
640 private TreeSet<NotificationEvent> _getNotificationEvents() {
641 if (_notificationEvents == null) {
642 _notificationEvents = new TreeSet<>(_comparator);
643 }
644
645 return _notificationEvents;
646 }
647
648 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
649 if (_unconfirmedNotificationEvents == null) {
650 _unconfirmedNotificationEvents = new LinkedHashMap<>();
651 }
652
653 return _unconfirmedNotificationEvents;
654 }
655
656 private static final Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
657
658 private static final Comparator<NotificationEvent> _comparator =
659 new NotificationEventComparator();
660
661 private TreeSet<NotificationEvent> _notificationEvents;
662 private final ReentrantLock _reentrantLock = new ReentrantLock();
663 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
664
665 }