001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.model.CompanyConstants;
024 import com.liferay.portal.kernel.model.UserNotificationEvent;
025 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
026 import com.liferay.portal.kernel.notifications.ChannelException;
027 import com.liferay.portal.kernel.notifications.NotificationEvent;
028 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
029 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
030 import com.liferay.portal.kernel.service.UserNotificationEventLocalServiceUtil;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.util.ArrayList;
034 import java.util.Collection;
035 import java.util.Comparator;
036 import java.util.HashSet;
037 import java.util.Iterator;
038 import java.util.LinkedHashMap;
039 import java.util.List;
040 import java.util.Map;
041 import java.util.Set;
042 import java.util.TreeSet;
043
044
049 public class ChannelImpl extends BaseChannelImpl {
050
051 public ChannelImpl() {
052 this(CompanyConstants.SYSTEM, 0);
053 }
054
055 public ChannelImpl(long companyId, long usedId) {
056 super(companyId, usedId);
057 }
058
059 @Override
060 public void confirmDelivery(Collection<String> notificationEventUuids)
061 throws ChannelException {
062
063 confirmDelivery(notificationEventUuids, false);
064 }
065
066 @Override
067 public void confirmDelivery(
068 Collection<String> notificationEventUuids, boolean archive)
069 throws ChannelException {
070
071 lock.lock();
072
073 try {
074 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
075 if (archive) {
076 UserNotificationEventLocalServiceUtil.
077 updateUserNotificationEvents(
078 notificationEventUuids, getCompanyId(), archive);
079 }
080 else {
081 UserNotificationEventLocalServiceUtil.
082 deleteUserNotificationEvents(
083 notificationEventUuids, getCompanyId());
084 }
085 }
086
087 for (String notificationEventUuid : notificationEventUuids) {
088 Map<String, NotificationEvent> unconfirmedNotificationEvents =
089 _getUnconfirmedNotificationEvents();
090
091 unconfirmedNotificationEvents.remove(notificationEventUuid);
092 }
093 }
094 catch (Exception e) {
095 throw new ChannelException(
096 "Unable to confirm delivery for user " + getUserId(), e);
097 }
098 finally {
099 lock.unlock();
100 }
101 }
102
103 @Override
104 public void confirmDelivery(String notificationEventUuid)
105 throws ChannelException {
106
107 confirmDelivery(notificationEventUuid, false);
108 }
109
110 @Override
111 public void confirmDelivery(String notificationEventUuid, boolean archive)
112 throws ChannelException {
113
114 lock.lock();
115
116 try {
117 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
118 if (archive) {
119 UserNotificationEventLocalServiceUtil.
120 updateUserNotificationEvent(
121 notificationEventUuid, getCompanyId(), archive);
122 }
123 else {
124 UserNotificationEventLocalServiceUtil.
125 deleteUserNotificationEvent(
126 notificationEventUuid, getCompanyId());
127 }
128 }
129
130 Map<String, NotificationEvent> unconfirmedNotificationEvents =
131 _getUnconfirmedNotificationEvents();
132
133 unconfirmedNotificationEvents.remove(notificationEventUuid);
134 }
135 catch (Exception e) {
136 throw new ChannelException(
137 "Unable to confirm delivery for " + notificationEventUuid, e);
138 }
139 finally {
140 lock.unlock();
141 }
142 }
143
144 @Override
145 public void deleteUserNotificiationEvent(String notificationEventUuid)
146 throws ChannelException {
147
148 lock.lock();
149
150 try {
151 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvent(
152 notificationEventUuid, getCompanyId());
153
154 Map<String, NotificationEvent> unconfirmedNotificationEvents =
155 _getUnconfirmedNotificationEvents();
156
157 unconfirmedNotificationEvents.remove(notificationEventUuid);
158 }
159 catch (Exception e) {
160 throw new ChannelException(
161 "Unable to delete event " + notificationEventUuid, e);
162 }
163 finally {
164 lock.unlock();
165 }
166 }
167
168 @Override
169 public void deleteUserNotificiationEvents(
170 Collection<String> notificationEventUuids)
171 throws ChannelException {
172
173 lock.lock();
174
175 try {
176 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
177 notificationEventUuids, getCompanyId());
178
179 for (String notificationEventUuid : notificationEventUuids) {
180 Map<String, NotificationEvent> unconfirmedNotificationEvents =
181 _getUnconfirmedNotificationEvents();
182
183 unconfirmedNotificationEvents.remove(notificationEventUuid);
184 }
185 }
186 catch (Exception e) {
187 throw new ChannelException(
188 "Unable to delete events for user " + getUserId(), e);
189 }
190 finally {
191 lock.unlock();
192 }
193 }
194
195 @Override
196 public void flush() {
197 lock.lock();
198
199 try {
200 if (_notificationEvents != null) {
201 _notificationEvents.clear();
202 }
203 }
204 finally {
205 lock.unlock();
206 }
207 }
208
209 @Override
210 public void flush(long timestamp) {
211 lock.lock();
212
213 try {
214 if (_notificationEvents == null) {
215 return;
216 }
217
218 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
219
220 while (itr.hasNext()) {
221 NotificationEvent notificationEvent = itr.next();
222
223 if (notificationEvent.getTimestamp() < timestamp) {
224 itr.remove();
225 }
226 }
227 }
228 finally {
229 lock.unlock();
230 }
231 }
232
233 @Override
234 public List<NotificationEvent> getNotificationEvents(boolean flush)
235 throws ChannelException {
236
237 lock.lock();
238
239 try {
240 return doGetNotificationEvents(flush);
241 }
242 catch (ChannelException ce) {
243 throw ce;
244 }
245 catch (Exception e) {
246 throw new ChannelException(e);
247 }
248 finally {
249 lock.unlock();
250 }
251 }
252
253 @Override
254 public void init() throws ChannelException {
255 lock.lock();
256
257 try {
258 doInit();
259 }
260 catch (SystemException se) {
261 throw new ChannelException(
262 "Unable to init channel " + getUserId(), se);
263 }
264 finally {
265 lock.unlock();
266 }
267 }
268
269 @Override
270 public void removeTransientNotificationEvents(
271 Collection<NotificationEvent> notificationEvents) {
272
273 lock.lock();
274
275 try {
276 if (_notificationEvents != null) {
277 _notificationEvents.removeAll(notificationEvents);
278 }
279 }
280 finally {
281 lock.unlock();
282 }
283 }
284
285 @Override
286 public void removeTransientNotificationEventsByUuid(
287 Collection<String> notificationEventUuids) {
288
289 Set<String> notificationEventUuidsSet = new HashSet<>(
290 notificationEventUuids);
291
292 lock.lock();
293
294 try {
295 if (_notificationEvents == null) {
296 return;
297 }
298
299 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
300
301 while (itr.hasNext()) {
302 NotificationEvent notificationEvent = itr.next();
303
304 if (notificationEventUuidsSet.contains(
305 notificationEvent.getUuid())) {
306
307 itr.remove();
308 }
309 }
310 }
311 finally {
312 lock.unlock();
313 }
314 }
315
316 @Override
317 public void sendNotificationEvent(NotificationEvent notificationEvent)
318 throws ChannelException {
319
320 lock.lock();
321
322 try {
323 long currentTime = System.currentTimeMillis();
324
325 doStoreNotificationEvent(notificationEvent, currentTime);
326
327 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
328 notificationEvent.isDeliveryRequired()) {
329
330 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
331 getUserId(), notificationEvent);
332 }
333
334 notifyChannelListeners();
335 }
336 catch (Exception e) {
337 throw new ChannelException("Unable to send event", e);
338 }
339 finally {
340 lock.unlock();
341 }
342 }
343
344 @Override
345 public void sendNotificationEvents(
346 Collection<NotificationEvent> notificationEvents)
347 throws ChannelException {
348
349 lock.lock();
350
351 try {
352 long currentTime = System.currentTimeMillis();
353
354 List<NotificationEvent> persistedNotificationEvents =
355 new ArrayList<>(notificationEvents.size());
356
357 for (NotificationEvent notificationEvent : notificationEvents) {
358 doStoreNotificationEvent(notificationEvent, currentTime);
359
360 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
361 notificationEvent.isDeliveryRequired()) {
362
363 persistedNotificationEvents.add(notificationEvent);
364 }
365 }
366
367 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
368 !persistedNotificationEvents.isEmpty()) {
369
370 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
371 getUserId(), persistedNotificationEvents);
372 }
373
374 notifyChannelListeners();
375 }
376 catch (Exception e) {
377 throw new ChannelException("Unable to send event", e);
378 }
379 finally {
380 lock.unlock();
381 }
382 }
383
384 @Override
385 public void storeNotificationEvent(
386 NotificationEvent notificationEvent, long currentTime) {
387
388 lock.lock();
389
390 try {
391 doStoreNotificationEvent(notificationEvent, currentTime);
392 }
393 finally {
394 lock.unlock();
395 }
396 }
397
398 @Override
399 protected void doCleanUp() throws Exception {
400 lock.lock();
401
402 try {
403 long currentTime = System.currentTimeMillis();
404
405 TreeSet<NotificationEvent> notificationEvents =
406 _getNotificationEvents();
407
408 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
409
410 while (itr1.hasNext()) {
411 NotificationEvent notificationEvent = itr1.next();
412
413 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
414 itr1.remove();
415 }
416 }
417
418 Map<String, NotificationEvent> unconfirmedNotificationEvents =
419 _getUnconfirmedNotificationEvents();
420
421 List<String> invalidNotificationEventUuids = new ArrayList<>(
422 unconfirmedNotificationEvents.size());
423
424 Set<Map.Entry<String, NotificationEvent>>
425 unconfirmedNotificationEventsSet =
426 unconfirmedNotificationEvents.entrySet();
427
428 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
429 unconfirmedNotificationEventsSet.iterator();
430
431 while (itr2.hasNext()) {
432 Map.Entry<String, NotificationEvent> entry = itr2.next();
433
434 NotificationEvent notificationEvent = entry.getValue();
435
436 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
437 invalidNotificationEventUuids.add(entry.getKey());
438
439 itr2.remove();
440 }
441 }
442
443 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
444 !invalidNotificationEventUuids.isEmpty()) {
445
446 UserNotificationEventLocalServiceUtil.
447 deleteUserNotificationEvents(
448 invalidNotificationEventUuids, getCompanyId());
449 }
450 }
451 catch (Exception e) {
452 throw new ChannelException(
453 "Unable to clean up channel " + getUserId(), e);
454 }
455 finally {
456 lock.unlock();
457 }
458 }
459
460 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
461 throws Exception {
462
463 long currentTime = System.currentTimeMillis();
464
465 TreeSet<NotificationEvent> notificationEventsSet =
466 _getNotificationEvents();
467
468 Map<String, NotificationEvent> unconfirmedNotificationEvents =
469 _getUnconfirmedNotificationEvents();
470
471 List<NotificationEvent> notificationEvents =
472 new ArrayList<NotificationEvent>(
473 notificationEventsSet.size() +
474 unconfirmedNotificationEvents.size());
475
476 for (NotificationEvent notificationEvent : notificationEventsSet) {
477 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
478 break;
479 }
480 else {
481 notificationEvents.add(notificationEvent);
482 }
483 }
484
485 if (flush) {
486 notificationEventsSet.clear();
487 }
488 else if (notificationEventsSet.size() != notificationEvents.size()) {
489 notificationEventsSet.retainAll(notificationEvents);
490 }
491
492 List<String> invalidNotificationEventUuids = new ArrayList<>(
493 unconfirmedNotificationEvents.size());
494
495 Set<Map.Entry<String, NotificationEvent>>
496 unconfirmedNotificationEventsSet =
497 unconfirmedNotificationEvents.entrySet();
498
499 Iterator<Map.Entry<String, NotificationEvent>> itr =
500 unconfirmedNotificationEventsSet.iterator();
501
502 while (itr.hasNext()) {
503 Map.Entry<String, NotificationEvent> entry = itr.next();
504
505 NotificationEvent notificationEvent = entry.getValue();
506
507 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
508 !notificationEvent.isArchived()) {
509
510 invalidNotificationEventUuids.add(notificationEvent.getUuid());
511
512 itr.remove();
513 }
514 else {
515 notificationEvents.add(entry.getValue());
516 }
517 }
518
519 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
520 !invalidNotificationEventUuids.isEmpty()) {
521
522 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
523 invalidNotificationEventUuids, getCompanyId());
524 }
525
526 return notificationEvents;
527 }
528
529 protected void doInit() {
530 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
531 return;
532 }
533
534 List<UserNotificationEvent> userNotificationEvents =
535 UserNotificationEventLocalServiceUtil.
536 getDeliveredUserNotificationEvents(getUserId(), false);
537
538 Map<String, NotificationEvent> unconfirmedNotificationEvents =
539 _getUnconfirmedNotificationEvents();
540
541 List<String> invalidNotificationEventUuids = new ArrayList<>(
542 unconfirmedNotificationEvents.size());
543
544 long currentTime = System.currentTimeMillis();
545
546 for (UserNotificationEvent persistedNotificationEvent :
547 userNotificationEvents) {
548
549 try {
550 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
551 persistedNotificationEvent.getPayload());
552
553 NotificationEvent notificationEvent =
554 NotificationEventFactoryUtil.createNotificationEvent(
555 persistedNotificationEvent.getTimestamp(),
556 persistedNotificationEvent.getType(),
557 payloadJSONObject);
558
559 notificationEvent.setDeliveryRequired(
560 persistedNotificationEvent.getDeliverBy());
561
562 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
563
564 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
565 invalidNotificationEventUuids.add(
566 notificationEvent.getUuid());
567 }
568 else {
569 unconfirmedNotificationEvents.put(
570 notificationEvent.getUuid(), notificationEvent);
571 }
572 }
573 catch (JSONException jsone) {
574 _log.error(jsone, jsone);
575
576 invalidNotificationEventUuids.add(
577 persistedNotificationEvent.getUuid());
578 }
579 }
580
581 if (!invalidNotificationEventUuids.isEmpty()) {
582 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
583 invalidNotificationEventUuids, getCompanyId());
584 }
585 }
586
587 protected void doStoreNotificationEvent(
588 NotificationEvent notificationEvent, long currentTime) {
589
590 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
591 return;
592 }
593
594 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
595 notificationEvent.isDeliveryRequired()) {
596
597 Map<String, NotificationEvent> unconfirmedNotificationEvents =
598 _getUnconfirmedNotificationEvents();
599
600 unconfirmedNotificationEvents.put(
601 notificationEvent.getUuid(), notificationEvent);
602 }
603 else {
604 TreeSet<NotificationEvent> notificationEvents =
605 _getNotificationEvents();
606
607 notificationEvents.add(notificationEvent);
608
609 if (notificationEvents.size() >
610 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
611
612 NotificationEvent firstNotificationEvent =
613 notificationEvents.first();
614
615 notificationEvents.remove(firstNotificationEvent);
616 }
617 }
618 }
619
620 protected boolean isRemoveNotificationEvent(
621 NotificationEvent notificationEvent, long currentTime) {
622
623 if ((notificationEvent.getDeliverBy() != 0) &&
624 (notificationEvent.getDeliverBy() <= currentTime)) {
625
626 return true;
627 }
628 else {
629 return false;
630 }
631 }
632
633 private TreeSet<NotificationEvent> _getNotificationEvents() {
634 if (_notificationEvents == null) {
635 _notificationEvents = new TreeSet<>(_comparator);
636 }
637
638 return _notificationEvents;
639 }
640
641 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
642 if (_unconfirmedNotificationEvents == null) {
643 _unconfirmedNotificationEvents = new LinkedHashMap<>();
644 }
645
646 return _unconfirmedNotificationEvents;
647 }
648
649 private static final Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
650
651 private static final Comparator<NotificationEvent> _comparator =
652 new NotificationEventComparator();
653
654 private TreeSet<NotificationEvent> _notificationEvents;
655 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
656
657 }