001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashMap;
038 import java.util.HashSet;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 @Override
062 public Channel clone(long companyId, long userId) {
063 return new ChannelImpl(companyId, userId);
064 }
065
066 @Override
067 public void confirmDelivery(Collection<String> notificationEventUuids)
068 throws ChannelException {
069
070 confirmDelivery(notificationEventUuids, false);
071 }
072
073 @Override
074 public void confirmDelivery(
075 Collection<String> notificationEventUuids, boolean archive)
076 throws ChannelException {
077
078 _reentrantLock.lock();
079
080 try {
081 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
082 if (archive) {
083 UserNotificationEventLocalServiceUtil.
084 updateUserNotificationEvents(
085 notificationEventUuids, getCompanyId(), archive);
086 }
087 else {
088 UserNotificationEventLocalServiceUtil.
089 deleteUserNotificationEvents(
090 notificationEventUuids, getCompanyId());
091 }
092 }
093
094 for (String notificationEventUuid : notificationEventUuids) {
095 Map<String, NotificationEvent> unconfirmedNotificationEvents =
096 _getUnconfirmedNotificationEvents();
097
098 unconfirmedNotificationEvents.remove(notificationEventUuid);
099 }
100 }
101 catch (Exception e) {
102 throw new ChannelException(
103 "Unable to confirm delivery for user " + getUserId() , e);
104 }
105 finally {
106 _reentrantLock.unlock();
107 }
108 }
109
110 @Override
111 public void confirmDelivery(String notificationEventUuid)
112 throws ChannelException {
113
114 confirmDelivery(notificationEventUuid, false);
115 }
116
117 @Override
118 public void confirmDelivery(String notificationEventUuid, boolean archive)
119 throws ChannelException {
120
121 _reentrantLock.lock();
122
123 try {
124 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
125 if (archive) {
126 UserNotificationEventLocalServiceUtil.
127 updateUserNotificationEvent(
128 notificationEventUuid, getCompanyId(), archive);
129 }
130 else {
131 UserNotificationEventLocalServiceUtil.
132 deleteUserNotificationEvent(
133 notificationEventUuid, getCompanyId());
134 }
135 }
136
137 Map<String, NotificationEvent> unconfirmedNotificationEvents =
138 _getUnconfirmedNotificationEvents();
139
140 unconfirmedNotificationEvents.remove(notificationEventUuid);
141 }
142 catch (Exception e) {
143 throw new ChannelException(
144 "Unable to confirm delivery for " + notificationEventUuid , e);
145 }
146 finally {
147 _reentrantLock.unlock();
148 }
149 }
150
151 @Override
152 public void deleteUserNotificiationEvent(String notificationEventUuid)
153 throws ChannelException {
154
155 _reentrantLock.lock();
156
157 try {
158 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvent(
159 notificationEventUuid, getCompanyId());
160
161 Map<String, NotificationEvent> unconfirmedNotificationEvents =
162 _getUnconfirmedNotificationEvents();
163
164 unconfirmedNotificationEvents.remove(notificationEventUuid);
165 }
166 catch (Exception e) {
167 throw new ChannelException(
168 "Unable to delete event " + notificationEventUuid , e);
169 }
170 finally {
171 _reentrantLock.unlock();
172 }
173 }
174
175 @Override
176 public void deleteUserNotificiationEvents(
177 Collection<String> notificationEventUuids)
178 throws ChannelException {
179
180 _reentrantLock.lock();
181
182 try {
183 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
184 notificationEventUuids, getCompanyId());
185
186 for (String notificationEventUuid : notificationEventUuids) {
187 Map<String, NotificationEvent> unconfirmedNotificationEvents =
188 _getUnconfirmedNotificationEvents();
189
190 unconfirmedNotificationEvents.remove(notificationEventUuid);
191 }
192 }
193 catch (Exception e) {
194 throw new ChannelException(
195 "Unable to delete events for user " + getUserId() , e);
196 }
197 finally {
198 _reentrantLock.unlock();
199 }
200 }
201
202 @Override
203 public void flush() {
204 _reentrantLock.lock();
205
206 try {
207 if (_notificationEvents != null) {
208 _notificationEvents.clear();
209 }
210 }
211 finally {
212 _reentrantLock.unlock();
213 }
214 }
215
216 @Override
217 public void flush(long timestamp) {
218 _reentrantLock.lock();
219
220 try {
221 if (_notificationEvents == null) {
222 return;
223 }
224
225 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
226
227 while (itr.hasNext()) {
228 NotificationEvent notificationEvent = itr.next();
229
230 if (notificationEvent.getTimestamp() < timestamp) {
231 itr.remove();
232 }
233 }
234 }
235 finally {
236 _reentrantLock.unlock();
237 }
238 }
239
240 @Override
241 public List<NotificationEvent> getNotificationEvents(boolean flush)
242 throws ChannelException {
243
244 _reentrantLock.lock();
245
246 try {
247 return doGetNotificationEvents(flush);
248 }
249 catch (ChannelException ce) {
250 throw ce;
251 }
252 catch (Exception e) {
253 throw new ChannelException(e);
254 }
255 finally {
256 _reentrantLock.unlock();
257 }
258 }
259
260 @Override
261 public void init() throws ChannelException {
262 _reentrantLock.lock();
263
264 try {
265 doInit();
266 }
267 catch (SystemException se) {
268 throw new ChannelException(
269 "Unable to init channel " + getUserId(), se);
270 }
271 finally {
272 _reentrantLock.unlock();
273 }
274 }
275
276 @Override
277 public void removeTransientNotificationEvents(
278 Collection<NotificationEvent> notificationEvents) {
279
280 _reentrantLock.lock();
281
282 try {
283 if (_notificationEvents != null) {
284 _notificationEvents.removeAll(notificationEvents);
285 }
286 }
287 finally {
288 _reentrantLock.unlock();
289 }
290 }
291
292 @Override
293 public void removeTransientNotificationEventsByUuid(
294 Collection<String> notificationEventUuids) {
295
296 Set<String> notificationEventUuidsSet = new HashSet<String>(
297 notificationEventUuids);
298
299 _reentrantLock.lock();
300
301 try {
302 if (_notificationEvents == null) {
303 return;
304 }
305
306 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
307
308 while (itr.hasNext()) {
309 NotificationEvent notificationEvent = itr.next();
310
311 if (notificationEventUuidsSet.contains(
312 notificationEvent.getUuid())) {
313
314 itr.remove();
315 }
316 }
317 }
318 finally {
319 _reentrantLock.unlock();
320 }
321 }
322
323 @Override
324 public void sendNotificationEvent(NotificationEvent notificationEvent)
325 throws ChannelException {
326
327 _reentrantLock.lock();
328
329 try {
330 long currentTime = System.currentTimeMillis();
331
332 storeNotificationEvent(notificationEvent, currentTime);
333
334 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
335 notificationEvent.isDeliveryRequired()) {
336
337 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
338 getUserId(), notificationEvent);
339 }
340 }
341 catch (Exception e) {
342 throw new ChannelException("Unable to send event", e);
343 }
344 finally {
345 _reentrantLock.unlock();
346 }
347
348 notifyChannelListeners();
349 }
350
351 @Override
352 public void sendNotificationEvents(
353 Collection<NotificationEvent> notificationEvents)
354 throws ChannelException {
355
356 _reentrantLock.lock();
357
358 try {
359 long currentTime = System.currentTimeMillis();
360
361 List<NotificationEvent> persistedNotificationEvents =
362 new ArrayList<NotificationEvent>(notificationEvents.size());
363
364 for (NotificationEvent notificationEvent : notificationEvents) {
365 storeNotificationEvent(notificationEvent, currentTime);
366
367 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
368 notificationEvent.isDeliveryRequired()) {
369
370 persistedNotificationEvents.add(notificationEvent);
371 }
372 }
373
374 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
375 !persistedNotificationEvents.isEmpty()) {
376
377 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
378 getUserId(), persistedNotificationEvents);
379 }
380 }
381 catch (Exception e) {
382 throw new ChannelException("Unable to send event", e);
383 }
384 finally {
385 _reentrantLock.unlock();
386 }
387
388 notifyChannelListeners();
389 }
390
391 @Override
392 protected void doCleanUp() throws Exception {
393 _reentrantLock.lock();
394
395 try {
396 long currentTime = System.currentTimeMillis();
397
398 TreeSet<NotificationEvent> notificationEvents =
399 _getNotificationEvents();
400
401 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
402
403 while (itr1.hasNext()) {
404 NotificationEvent notificationEvent = itr1.next();
405
406 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
407 itr1.remove();
408 }
409 }
410
411 Map<String, NotificationEvent> unconfirmedNotificationEvents =
412 _getUnconfirmedNotificationEvents();
413
414 List<String> invalidNotificationEventUuids = new ArrayList<String>(
415 unconfirmedNotificationEvents.size());
416
417 Set<Map.Entry<String, NotificationEvent>>
418 unconfirmedNotificationEventsSet =
419 unconfirmedNotificationEvents.entrySet();
420
421 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
422 unconfirmedNotificationEventsSet.iterator();
423
424 while (itr2.hasNext()) {
425 Map.Entry<String, NotificationEvent> entry = itr2.next();
426
427 NotificationEvent notificationEvent = entry.getValue();
428
429 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
430 invalidNotificationEventUuids.add(entry.getKey());
431
432 itr2.remove();
433 }
434 }
435
436 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
437 !invalidNotificationEventUuids.isEmpty()) {
438
439 UserNotificationEventLocalServiceUtil.
440 deleteUserNotificationEvents(
441 invalidNotificationEventUuids, getCompanyId());
442 }
443 }
444 catch (Exception e) {
445 throw new ChannelException(
446 "Unable to clean up channel " + getUserId(), e);
447 }
448 finally {
449 _reentrantLock.unlock();
450 }
451 }
452
453 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
454 throws Exception {
455
456 long currentTime = System.currentTimeMillis();
457
458 TreeSet<NotificationEvent> notificationEventsSet =
459 _getNotificationEvents();
460
461 Map<String, NotificationEvent> unconfirmedNotificationEvents =
462 _getUnconfirmedNotificationEvents();
463
464 List<NotificationEvent> notificationEvents =
465 new ArrayList<NotificationEvent>(
466 notificationEventsSet.size() +
467 unconfirmedNotificationEvents.size());
468
469 for (NotificationEvent notificationEvent : notificationEventsSet) {
470 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
471 break;
472 }
473 else {
474 notificationEvents.add(notificationEvent);
475 }
476 }
477
478 if (flush) {
479 notificationEventsSet.clear();
480 }
481 else if (notificationEventsSet.size() != notificationEvents.size()) {
482 notificationEventsSet.retainAll(notificationEvents);
483 }
484
485 List<String> invalidNotificationEventUuids = new ArrayList<String>(
486 unconfirmedNotificationEvents.size());
487
488 Set<Map.Entry<String, NotificationEvent>>
489 unconfirmedNotificationEventsSet =
490 unconfirmedNotificationEvents.entrySet();
491
492 Iterator<Map.Entry<String, NotificationEvent>> itr =
493 unconfirmedNotificationEventsSet.iterator();
494
495 while (itr.hasNext()) {
496 Map.Entry<String, NotificationEvent> entry = itr.next();
497
498 NotificationEvent notificationEvent = entry.getValue();
499
500 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
501 !notificationEvent.isArchived()) {
502
503 invalidNotificationEventUuids.add(notificationEvent.getUuid());
504
505 itr.remove();
506 }
507 else {
508 notificationEvents.add(entry.getValue());
509 }
510 }
511
512 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
513 !invalidNotificationEventUuids.isEmpty()) {
514
515 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
516 invalidNotificationEventUuids, getCompanyId());
517 }
518
519 return notificationEvents;
520 }
521
522 protected void doInit() throws SystemException {
523 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
524 return;
525 }
526
527 List<UserNotificationEvent> userNotificationEvents =
528 UserNotificationEventLocalServiceUtil.
529 getDeliveredUserNotificationEvents(getUserId(), false);
530
531 Map<String, NotificationEvent> unconfirmedNotificationEvents =
532 _getUnconfirmedNotificationEvents();
533
534 List<String> invalidNotificationEventUuids = new ArrayList<String>(
535 unconfirmedNotificationEvents.size());
536
537 long currentTime = System.currentTimeMillis();
538
539 for (UserNotificationEvent persistedNotificationEvent :
540 userNotificationEvents) {
541
542 try {
543 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
544 persistedNotificationEvent.getPayload());
545
546 NotificationEvent notificationEvent =
547 NotificationEventFactoryUtil.createNotificationEvent(
548 persistedNotificationEvent.getTimestamp(),
549 persistedNotificationEvent.getType(),
550 payloadJSONObject);
551
552 notificationEvent.setDeliveryRequired(
553 persistedNotificationEvent.getDeliverBy());
554
555 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
556
557 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
558 invalidNotificationEventUuids.add(
559 notificationEvent.getUuid());
560 }
561 else {
562 unconfirmedNotificationEvents.put(
563 notificationEvent.getUuid(), notificationEvent);
564 }
565 }
566 catch (JSONException jsone) {
567 _log.error(jsone, jsone);
568
569 invalidNotificationEventUuids.add(
570 persistedNotificationEvent.getUuid());
571 }
572 }
573
574 if (!invalidNotificationEventUuids.isEmpty()) {
575 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
576 invalidNotificationEventUuids, getCompanyId());
577 }
578 }
579
580 protected boolean isRemoveNotificationEvent(
581 NotificationEvent notificationEvent, long currentTime) {
582
583 if ((notificationEvent.getDeliverBy() != 0) &&
584 (notificationEvent.getDeliverBy() <= currentTime)) {
585
586 return true;
587 }
588 else {
589 return false;
590 }
591 }
592
593 protected void storeNotificationEvent(
594 NotificationEvent notificationEvent, long currentTime) {
595
596 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
597 return;
598 }
599
600 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
601 notificationEvent.isDeliveryRequired()) {
602
603 Map<String, NotificationEvent> unconfirmedNotificationEvents =
604 _getUnconfirmedNotificationEvents();
605
606 unconfirmedNotificationEvents.put(
607 notificationEvent.getUuid(), notificationEvent);
608 }
609 else {
610 TreeSet<NotificationEvent> notificationEvents =
611 _getNotificationEvents();
612
613 notificationEvents.add(notificationEvent);
614
615 if (notificationEvents.size() >
616 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
617
618 NotificationEvent firstNotificationEvent =
619 notificationEvents.first();
620
621 notificationEvents.remove(firstNotificationEvent);
622 }
623 }
624 }
625
626 private TreeSet<NotificationEvent> _getNotificationEvents() {
627 if (_notificationEvents == null) {
628 _notificationEvents = new TreeSet<NotificationEvent>(_comparator);
629 }
630
631 return _notificationEvents;
632 }
633
634 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
635 if (_unconfirmedNotificationEvents == null) {
636 _unconfirmedNotificationEvents =
637 new HashMap<String, NotificationEvent>();
638 }
639
640 return _unconfirmedNotificationEvents;
641 }
642
643 private static Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
644
645 private static Comparator<NotificationEvent> _comparator =
646 new NotificationEventComparator();
647
648 private TreeSet<NotificationEvent> _notificationEvents;
649 private ReentrantLock _reentrantLock = new ReentrantLock();
650 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
651
652 }