001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashMap;
038 import java.util.HashSet;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 public Channel clone(long companyId, long userId) {
062 return new ChannelImpl(companyId, userId);
063 }
064
065 public void confirmDelivery(Collection<String> notificationEventUuids)
066 throws ChannelException {
067
068 confirmDelivery(notificationEventUuids, false);
069 }
070
071 public void confirmDelivery(
072 Collection<String> notificationEventUuids, boolean archive)
073 throws ChannelException {
074
075 _reentrantLock.lock();
076
077 try {
078 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
079 if (archive) {
080 UserNotificationEventLocalServiceUtil.
081 updateUserNotificationEvents(
082 notificationEventUuids, archive);
083 }
084 else {
085 UserNotificationEventLocalServiceUtil.
086 deleteUserNotificationEvents(notificationEventUuids);
087 }
088 }
089
090 for (String notificationEventUuid : notificationEventUuids) {
091 Map<String, NotificationEvent> unconfirmedNotificationEvents =
092 _getUnconfirmedNotificationEvents();
093
094 unconfirmedNotificationEvents.remove(notificationEventUuid);
095 }
096 }
097 catch (Exception e) {
098 throw new ChannelException(
099 "Unable to confirm delivery for user " + getUserId() , e);
100 }
101 finally {
102 _reentrantLock.unlock();
103 }
104 }
105
106 public void confirmDelivery(String notificationEventUuid)
107 throws ChannelException {
108
109 confirmDelivery(notificationEventUuid, false);
110 }
111
112 public void confirmDelivery(String notificationEventUuid, boolean archive)
113 throws ChannelException {
114
115 _reentrantLock.lock();
116
117 try {
118 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
119 if (archive) {
120 UserNotificationEventLocalServiceUtil.
121 updateUserNotificationEvent(
122 notificationEventUuid, archive);
123 }
124 else {
125 UserNotificationEventLocalServiceUtil.
126 deleteUserNotificationEvent(notificationEventUuid);
127 }
128 }
129
130 Map<String, NotificationEvent> unconfirmedNotificationEvents =
131 _getUnconfirmedNotificationEvents();
132
133 unconfirmedNotificationEvents.remove(notificationEventUuid);
134 }
135 catch (Exception e) {
136 throw new ChannelException(
137 "Unable to confirm delivery for " + notificationEventUuid , e);
138 }
139 finally {
140 _reentrantLock.unlock();
141 }
142 }
143
144 public void deleteUserNotificiationEvent(String notificationEventUuid)
145 throws ChannelException {
146
147 _reentrantLock.lock();
148
149 try {
150 UserNotificationEventLocalServiceUtil.
151 deleteUserNotificationEvent(notificationEventUuid);
152
153 Map<String, NotificationEvent> unconfirmedNotificationEvents =
154 _getUnconfirmedNotificationEvents();
155
156 unconfirmedNotificationEvents.remove(notificationEventUuid);
157 }
158 catch (Exception e) {
159 throw new ChannelException(
160 "Unable to delete event " + notificationEventUuid , e);
161 }
162 finally {
163 _reentrantLock.unlock();
164 }
165 }
166
167 public void deleteUserNotificiationEvents(
168 Collection<String> notificationEventUuids)
169 throws ChannelException {
170
171 _reentrantLock.lock();
172
173 try {
174 UserNotificationEventLocalServiceUtil.
175 deleteUserNotificationEvents(notificationEventUuids);
176
177 for (String notificationEventUuid : notificationEventUuids) {
178 Map<String, NotificationEvent> unconfirmedNotificationEvents =
179 _getUnconfirmedNotificationEvents();
180
181 unconfirmedNotificationEvents.remove(notificationEventUuid);
182 }
183 }
184 catch (Exception e) {
185 throw new ChannelException(
186 "Unable to delete events for user " + getUserId() , e);
187 }
188 finally {
189 _reentrantLock.unlock();
190 }
191 }
192
193 public void flush() {
194 _reentrantLock.lock();
195
196 try {
197 if (_notificationEvents != null) {
198 _notificationEvents.clear();
199 }
200 }
201 finally {
202 _reentrantLock.unlock();
203 }
204 }
205
206 public void flush(long timestamp) {
207 _reentrantLock.lock();
208
209 try {
210 if (_notificationEvents == null) {
211 return;
212 }
213
214 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
215
216 while (itr.hasNext()) {
217 NotificationEvent notificationEvent = itr.next();
218
219 if (notificationEvent.getTimestamp() < timestamp) {
220 itr.remove();
221 }
222 }
223 }
224 finally {
225 _reentrantLock.unlock();
226 }
227 }
228
229 public List<NotificationEvent> getNotificationEvents(boolean flush)
230 throws ChannelException {
231
232 _reentrantLock.lock();
233
234 try {
235 return doGetNotificationEvents(flush);
236 }
237 catch (ChannelException ce) {
238 throw ce;
239 }
240 catch (Exception e) {
241 throw new ChannelException(e);
242 }
243 finally {
244 _reentrantLock.unlock();
245 }
246 }
247
248 public void init() throws ChannelException {
249 _reentrantLock.lock();
250
251 try {
252 doInit();
253 }
254 catch (SystemException se) {
255 throw new ChannelException(
256 "Unable to init channel " + getUserId(), se);
257 }
258 finally {
259 _reentrantLock.unlock();
260 }
261 }
262
263 public void removeTransientNotificationEvents(
264 Collection<NotificationEvent> notificationEvents) {
265
266 _reentrantLock.lock();
267
268 try {
269 if (_notificationEvents != null) {
270 _notificationEvents.removeAll(notificationEvents);
271 }
272 }
273 finally {
274 _reentrantLock.unlock();
275 }
276 }
277
278 public void removeTransientNotificationEventsByUuid(
279 Collection<String> notificationEventUuids) {
280
281 Set<String> notificationEventUuidsSet = new HashSet<String>(
282 notificationEventUuids);
283
284 _reentrantLock.lock();
285
286 try {
287 if (_notificationEvents == null) {
288 return;
289 }
290
291 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
292
293 while (itr.hasNext()) {
294 NotificationEvent notificationEvent = itr.next();
295
296 if (notificationEventUuidsSet.contains(
297 notificationEvent.getUuid())) {
298
299 itr.remove();
300 }
301 }
302 }
303 finally {
304 _reentrantLock.unlock();
305 }
306 }
307
308 public void sendNotificationEvent(NotificationEvent notificationEvent)
309 throws ChannelException {
310
311 _reentrantLock.lock();
312
313 try {
314 long currentTime = System.currentTimeMillis();
315
316 storeNotificationEvent(notificationEvent, currentTime);
317
318 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
319 notificationEvent.isDeliveryRequired()) {
320
321 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
322 getUserId(), notificationEvent);
323 }
324 }
325 catch (Exception e) {
326 throw new ChannelException("Unable to send event", e);
327 }
328 finally {
329 _reentrantLock.unlock();
330 }
331
332 notifyChannelListeners();
333 }
334
335 public void sendNotificationEvents(
336 Collection<NotificationEvent> notificationEvents)
337 throws ChannelException {
338
339 _reentrantLock.lock();
340
341 try {
342 long currentTime = System.currentTimeMillis();
343
344 List<NotificationEvent> persistedNotificationEvents =
345 new ArrayList<NotificationEvent>(notificationEvents.size());
346
347 for (NotificationEvent notificationEvent : notificationEvents) {
348 storeNotificationEvent(notificationEvent, currentTime);
349
350 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
351 notificationEvent.isDeliveryRequired()) {
352
353 persistedNotificationEvents.add(notificationEvent);
354 }
355 }
356
357 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
358 !persistedNotificationEvents.isEmpty()) {
359
360 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
361 getUserId(), persistedNotificationEvents);
362 }
363 }
364 catch (Exception e) {
365 throw new ChannelException("Unable to send event", e);
366 }
367 finally {
368 _reentrantLock.unlock();
369 }
370
371 notifyChannelListeners();
372 }
373
374 @Override
375 protected void doCleanUp() throws Exception {
376 _reentrantLock.lock();
377
378 try {
379 long currentTime = System.currentTimeMillis();
380
381 TreeSet<NotificationEvent> notificationEvents =
382 _getNotificationEvents();
383
384 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
385
386 while (itr1.hasNext()) {
387 NotificationEvent notificationEvent = itr1.next();
388
389 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
390 itr1.remove();
391 }
392 }
393
394 Map<String, NotificationEvent> unconfirmedNotificationEvents =
395 _getUnconfirmedNotificationEvents();
396
397 List<String> invalidNotificationEventUuids = new ArrayList<String>(
398 unconfirmedNotificationEvents.size());
399
400 Set<Map.Entry<String, NotificationEvent>>
401 unconfirmedNotificationEventsSet =
402 unconfirmedNotificationEvents.entrySet();
403
404 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
405 unconfirmedNotificationEventsSet.iterator();
406
407 while (itr2.hasNext()) {
408 Map.Entry<String, NotificationEvent> entry = itr2.next();
409
410 NotificationEvent notificationEvent = entry.getValue();
411
412 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
413 invalidNotificationEventUuids.add(entry.getKey());
414
415 itr2.remove();
416 }
417 }
418
419 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
420 !invalidNotificationEventUuids.isEmpty()) {
421
422 UserNotificationEventLocalServiceUtil.
423 deleteUserNotificationEvents(invalidNotificationEventUuids);
424 }
425 }
426 catch (Exception e) {
427 throw new ChannelException(
428 "Unable to clean up channel " + getUserId(), e);
429 }
430 finally {
431 _reentrantLock.unlock();
432 }
433 }
434
435 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
436 throws Exception {
437
438 long currentTime = System.currentTimeMillis();
439
440 TreeSet<NotificationEvent> notificationEventsSet =
441 _getNotificationEvents();
442
443 Map<String, NotificationEvent> unconfirmedNotificationEvents =
444 _getUnconfirmedNotificationEvents();
445
446 List<NotificationEvent> notificationEvents =
447 new ArrayList<NotificationEvent>(
448 notificationEventsSet.size() +
449 unconfirmedNotificationEvents.size());
450
451 for (NotificationEvent notificationEvent : notificationEventsSet) {
452 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
453 break;
454 }
455 else {
456 notificationEvents.add(notificationEvent);
457 }
458 }
459
460 if (flush) {
461 notificationEventsSet.clear();
462 }
463 else if (notificationEventsSet.size() != notificationEvents.size()) {
464 notificationEventsSet.retainAll(notificationEvents);
465 }
466
467 List<String> invalidNotificationEventUuids = new ArrayList<String>(
468 unconfirmedNotificationEvents.size());
469
470 Set<Map.Entry<String, NotificationEvent>>
471 unconfirmedNotificationEventsSet =
472 unconfirmedNotificationEvents.entrySet();
473
474 Iterator<Map.Entry<String, NotificationEvent>> itr =
475 unconfirmedNotificationEventsSet.iterator();
476
477 while (itr.hasNext()) {
478 Map.Entry<String, NotificationEvent> entry = itr.next();
479
480 NotificationEvent notificationEvent = entry.getValue();
481
482 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
483 !notificationEvent.isArchived()) {
484
485 invalidNotificationEventUuids.add(notificationEvent.getUuid());
486
487 itr.remove();
488 }
489 else {
490 notificationEvents.add(entry.getValue());
491 }
492 }
493
494 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
495 !invalidNotificationEventUuids.isEmpty()) {
496
497 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
498 invalidNotificationEventUuids);
499 }
500
501 return notificationEvents;
502 }
503
504 protected void doInit() throws SystemException {
505 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
506 return;
507 }
508
509 List<UserNotificationEvent> userNotificationEvents =
510 UserNotificationEventLocalServiceUtil.getUserNotificationEvents(
511 getUserId(), false);
512
513 Map<String, NotificationEvent> unconfirmedNotificationEvents =
514 _getUnconfirmedNotificationEvents();
515
516 List<String> invalidNotificationEventUuids = new ArrayList<String>(
517 unconfirmedNotificationEvents.size());
518
519 long currentTime = System.currentTimeMillis();
520
521 for (UserNotificationEvent persistedNotificationEvent :
522 userNotificationEvents) {
523
524 try {
525 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
526 persistedNotificationEvent.getPayload());
527
528 NotificationEvent notificationEvent =
529 NotificationEventFactoryUtil.createNotificationEvent(
530 persistedNotificationEvent.getTimestamp(),
531 persistedNotificationEvent.getType(),
532 payloadJSONObject);
533
534 notificationEvent.setDeliveryRequired(
535 persistedNotificationEvent.getDeliverBy());
536
537 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
538
539 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
540 invalidNotificationEventUuids.add(
541 notificationEvent.getUuid());
542 }
543 else {
544 unconfirmedNotificationEvents.put(
545 notificationEvent.getUuid(), notificationEvent);
546 }
547 }
548 catch (JSONException jsone) {
549 _log.error(jsone, jsone);
550
551 invalidNotificationEventUuids.add(
552 persistedNotificationEvent.getUuid());
553 }
554 }
555
556 if (!invalidNotificationEventUuids.isEmpty()) {
557 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
558 invalidNotificationEventUuids);
559 }
560 }
561
562 protected boolean isRemoveNotificationEvent(
563 NotificationEvent notificationEvent, long currentTime) {
564
565 if ((notificationEvent.getDeliverBy() != 0) &&
566 (notificationEvent.getDeliverBy() <= currentTime)) {
567
568 return true;
569 }
570 else {
571 return false;
572 }
573 }
574
575 protected void storeNotificationEvent(
576 NotificationEvent notificationEvent, long currentTime) {
577
578 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
579 return;
580 }
581
582 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
583 notificationEvent.isDeliveryRequired()) {
584
585 Map<String, NotificationEvent> unconfirmedNotificationEvents =
586 _getUnconfirmedNotificationEvents();
587
588 unconfirmedNotificationEvents.put(
589 notificationEvent.getUuid(), notificationEvent);
590 }
591 else {
592 TreeSet<NotificationEvent> notificationEvents =
593 _getNotificationEvents();
594
595 notificationEvents.add(notificationEvent);
596
597 if (notificationEvents.size() >
598 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
599
600 NotificationEvent firstNotificationEvent =
601 notificationEvents.first();
602
603 notificationEvents.remove(firstNotificationEvent);
604 }
605 }
606 }
607
608 private TreeSet<NotificationEvent> _getNotificationEvents() {
609 if (_notificationEvents == null) {
610 _notificationEvents = new TreeSet<NotificationEvent>(_comparator);
611 }
612
613 return _notificationEvents;
614 }
615
616 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
617 if (_unconfirmedNotificationEvents == null) {
618 _unconfirmedNotificationEvents =
619 new HashMap<String, NotificationEvent>();
620 }
621
622 return _unconfirmedNotificationEvents;
623 }
624
625 private static Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
626
627 private static Comparator<NotificationEvent> _comparator =
628 new NotificationEventComparator();
629
630 private TreeSet<NotificationEvent> _notificationEvents;
631 private ReentrantLock _reentrantLock = new ReentrantLock();
632 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
633
634 }