001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashMap;
038 import java.util.HashSet;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 public Channel clone(long companyId, long userId) {
062 return new ChannelImpl(companyId, userId);
063 }
064
065 public void confirmDelivery(Collection<String> notificationEventUuids)
066 throws ChannelException {
067
068 confirmDelivery(notificationEventUuids, false);
069 }
070
071 public void confirmDelivery(
072 Collection<String> notificationEventUuids, boolean archive)
073 throws ChannelException {
074
075 _reentrantLock.lock();
076
077 try {
078 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
079 if (archive) {
080 UserNotificationEventLocalServiceUtil.
081 updateUserNotificationEvents(
082 notificationEventUuids, getCompanyId(), archive);
083 }
084 else {
085 UserNotificationEventLocalServiceUtil.
086 deleteUserNotificationEvents(
087 notificationEventUuids, getCompanyId());
088 }
089 }
090
091 for (String notificationEventUuid : notificationEventUuids) {
092 Map<String, NotificationEvent> unconfirmedNotificationEvents =
093 _getUnconfirmedNotificationEvents();
094
095 unconfirmedNotificationEvents.remove(notificationEventUuid);
096 }
097 }
098 catch (Exception e) {
099 throw new ChannelException(
100 "Unable to confirm delivery for user " + getUserId() , e);
101 }
102 finally {
103 _reentrantLock.unlock();
104 }
105 }
106
107 public void confirmDelivery(String notificationEventUuid)
108 throws ChannelException {
109
110 confirmDelivery(notificationEventUuid, false);
111 }
112
113 public void confirmDelivery(String notificationEventUuid, boolean archive)
114 throws ChannelException {
115
116 _reentrantLock.lock();
117
118 try {
119 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
120 if (archive) {
121 UserNotificationEventLocalServiceUtil.
122 updateUserNotificationEvent(
123 notificationEventUuid, getCompanyId(), archive);
124 }
125 else {
126 UserNotificationEventLocalServiceUtil.
127 deleteUserNotificationEvent(
128 notificationEventUuid, getCompanyId());
129 }
130 }
131
132 Map<String, NotificationEvent> unconfirmedNotificationEvents =
133 _getUnconfirmedNotificationEvents();
134
135 unconfirmedNotificationEvents.remove(notificationEventUuid);
136 }
137 catch (Exception e) {
138 throw new ChannelException(
139 "Unable to confirm delivery for " + notificationEventUuid , e);
140 }
141 finally {
142 _reentrantLock.unlock();
143 }
144 }
145
146 public void deleteUserNotificiationEvent(String notificationEventUuid)
147 throws ChannelException {
148
149 _reentrantLock.lock();
150
151 try {
152 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvent(
153 notificationEventUuid, getCompanyId());
154
155 Map<String, NotificationEvent> unconfirmedNotificationEvents =
156 _getUnconfirmedNotificationEvents();
157
158 unconfirmedNotificationEvents.remove(notificationEventUuid);
159 }
160 catch (Exception e) {
161 throw new ChannelException(
162 "Unable to delete event " + notificationEventUuid , e);
163 }
164 finally {
165 _reentrantLock.unlock();
166 }
167 }
168
169 public void deleteUserNotificiationEvents(
170 Collection<String> notificationEventUuids)
171 throws ChannelException {
172
173 _reentrantLock.lock();
174
175 try {
176 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
177 notificationEventUuids, getCompanyId());
178
179 for (String notificationEventUuid : notificationEventUuids) {
180 Map<String, NotificationEvent> unconfirmedNotificationEvents =
181 _getUnconfirmedNotificationEvents();
182
183 unconfirmedNotificationEvents.remove(notificationEventUuid);
184 }
185 }
186 catch (Exception e) {
187 throw new ChannelException(
188 "Unable to delete events for user " + getUserId() , e);
189 }
190 finally {
191 _reentrantLock.unlock();
192 }
193 }
194
195 public void flush() {
196 _reentrantLock.lock();
197
198 try {
199 if (_notificationEvents != null) {
200 _notificationEvents.clear();
201 }
202 }
203 finally {
204 _reentrantLock.unlock();
205 }
206 }
207
208 public void flush(long timestamp) {
209 _reentrantLock.lock();
210
211 try {
212 if (_notificationEvents == null) {
213 return;
214 }
215
216 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
217
218 while (itr.hasNext()) {
219 NotificationEvent notificationEvent = itr.next();
220
221 if (notificationEvent.getTimestamp() < timestamp) {
222 itr.remove();
223 }
224 }
225 }
226 finally {
227 _reentrantLock.unlock();
228 }
229 }
230
231 public List<NotificationEvent> getNotificationEvents(boolean flush)
232 throws ChannelException {
233
234 _reentrantLock.lock();
235
236 try {
237 return doGetNotificationEvents(flush);
238 }
239 catch (ChannelException ce) {
240 throw ce;
241 }
242 catch (Exception e) {
243 throw new ChannelException(e);
244 }
245 finally {
246 _reentrantLock.unlock();
247 }
248 }
249
250 public void init() throws ChannelException {
251 _reentrantLock.lock();
252
253 try {
254 doInit();
255 }
256 catch (SystemException se) {
257 throw new ChannelException(
258 "Unable to init channel " + getUserId(), se);
259 }
260 finally {
261 _reentrantLock.unlock();
262 }
263 }
264
265 public void removeTransientNotificationEvents(
266 Collection<NotificationEvent> notificationEvents) {
267
268 _reentrantLock.lock();
269
270 try {
271 if (_notificationEvents != null) {
272 _notificationEvents.removeAll(notificationEvents);
273 }
274 }
275 finally {
276 _reentrantLock.unlock();
277 }
278 }
279
280 public void removeTransientNotificationEventsByUuid(
281 Collection<String> notificationEventUuids) {
282
283 Set<String> notificationEventUuidsSet = new HashSet<String>(
284 notificationEventUuids);
285
286 _reentrantLock.lock();
287
288 try {
289 if (_notificationEvents == null) {
290 return;
291 }
292
293 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
294
295 while (itr.hasNext()) {
296 NotificationEvent notificationEvent = itr.next();
297
298 if (notificationEventUuidsSet.contains(
299 notificationEvent.getUuid())) {
300
301 itr.remove();
302 }
303 }
304 }
305 finally {
306 _reentrantLock.unlock();
307 }
308 }
309
310 public void sendNotificationEvent(NotificationEvent notificationEvent)
311 throws ChannelException {
312
313 _reentrantLock.lock();
314
315 try {
316 long currentTime = System.currentTimeMillis();
317
318 storeNotificationEvent(notificationEvent, currentTime);
319
320 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
321 notificationEvent.isDeliveryRequired()) {
322
323 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
324 getUserId(), notificationEvent);
325 }
326 }
327 catch (Exception e) {
328 throw new ChannelException("Unable to send event", e);
329 }
330 finally {
331 _reentrantLock.unlock();
332 }
333
334 notifyChannelListeners();
335 }
336
337 public void sendNotificationEvents(
338 Collection<NotificationEvent> notificationEvents)
339 throws ChannelException {
340
341 _reentrantLock.lock();
342
343 try {
344 long currentTime = System.currentTimeMillis();
345
346 List<NotificationEvent> persistedNotificationEvents =
347 new ArrayList<NotificationEvent>(notificationEvents.size());
348
349 for (NotificationEvent notificationEvent : notificationEvents) {
350 storeNotificationEvent(notificationEvent, currentTime);
351
352 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
353 notificationEvent.isDeliveryRequired()) {
354
355 persistedNotificationEvents.add(notificationEvent);
356 }
357 }
358
359 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
360 !persistedNotificationEvents.isEmpty()) {
361
362 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
363 getUserId(), persistedNotificationEvents);
364 }
365 }
366 catch (Exception e) {
367 throw new ChannelException("Unable to send event", e);
368 }
369 finally {
370 _reentrantLock.unlock();
371 }
372
373 notifyChannelListeners();
374 }
375
376 @Override
377 protected void doCleanUp() throws Exception {
378 _reentrantLock.lock();
379
380 try {
381 long currentTime = System.currentTimeMillis();
382
383 TreeSet<NotificationEvent> notificationEvents =
384 _getNotificationEvents();
385
386 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
387
388 while (itr1.hasNext()) {
389 NotificationEvent notificationEvent = itr1.next();
390
391 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
392 itr1.remove();
393 }
394 }
395
396 Map<String, NotificationEvent> unconfirmedNotificationEvents =
397 _getUnconfirmedNotificationEvents();
398
399 List<String> invalidNotificationEventUuids = new ArrayList<String>(
400 unconfirmedNotificationEvents.size());
401
402 Set<Map.Entry<String, NotificationEvent>>
403 unconfirmedNotificationEventsSet =
404 unconfirmedNotificationEvents.entrySet();
405
406 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
407 unconfirmedNotificationEventsSet.iterator();
408
409 while (itr2.hasNext()) {
410 Map.Entry<String, NotificationEvent> entry = itr2.next();
411
412 NotificationEvent notificationEvent = entry.getValue();
413
414 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
415 invalidNotificationEventUuids.add(entry.getKey());
416
417 itr2.remove();
418 }
419 }
420
421 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
422 !invalidNotificationEventUuids.isEmpty()) {
423
424 UserNotificationEventLocalServiceUtil.
425 deleteUserNotificationEvents(
426 invalidNotificationEventUuids, getCompanyId());
427 }
428 }
429 catch (Exception e) {
430 throw new ChannelException(
431 "Unable to clean up channel " + getUserId(), e);
432 }
433 finally {
434 _reentrantLock.unlock();
435 }
436 }
437
438 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
439 throws Exception {
440
441 long currentTime = System.currentTimeMillis();
442
443 TreeSet<NotificationEvent> notificationEventsSet =
444 _getNotificationEvents();
445
446 Map<String, NotificationEvent> unconfirmedNotificationEvents =
447 _getUnconfirmedNotificationEvents();
448
449 List<NotificationEvent> notificationEvents =
450 new ArrayList<NotificationEvent>(
451 notificationEventsSet.size() +
452 unconfirmedNotificationEvents.size());
453
454 for (NotificationEvent notificationEvent : notificationEventsSet) {
455 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
456 break;
457 }
458 else {
459 notificationEvents.add(notificationEvent);
460 }
461 }
462
463 if (flush) {
464 notificationEventsSet.clear();
465 }
466 else if (notificationEventsSet.size() != notificationEvents.size()) {
467 notificationEventsSet.retainAll(notificationEvents);
468 }
469
470 List<String> invalidNotificationEventUuids = new ArrayList<String>(
471 unconfirmedNotificationEvents.size());
472
473 Set<Map.Entry<String, NotificationEvent>>
474 unconfirmedNotificationEventsSet =
475 unconfirmedNotificationEvents.entrySet();
476
477 Iterator<Map.Entry<String, NotificationEvent>> itr =
478 unconfirmedNotificationEventsSet.iterator();
479
480 while (itr.hasNext()) {
481 Map.Entry<String, NotificationEvent> entry = itr.next();
482
483 NotificationEvent notificationEvent = entry.getValue();
484
485 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
486 !notificationEvent.isArchived()) {
487
488 invalidNotificationEventUuids.add(notificationEvent.getUuid());
489
490 itr.remove();
491 }
492 else {
493 notificationEvents.add(entry.getValue());
494 }
495 }
496
497 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
498 !invalidNotificationEventUuids.isEmpty()) {
499
500 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
501 invalidNotificationEventUuids, getCompanyId());
502 }
503
504 return notificationEvents;
505 }
506
507 protected void doInit() throws SystemException {
508 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
509 return;
510 }
511
512 List<UserNotificationEvent> userNotificationEvents =
513 UserNotificationEventLocalServiceUtil.getUserNotificationEvents(
514 getUserId(), false);
515
516 Map<String, NotificationEvent> unconfirmedNotificationEvents =
517 _getUnconfirmedNotificationEvents();
518
519 List<String> invalidNotificationEventUuids = new ArrayList<String>(
520 unconfirmedNotificationEvents.size());
521
522 long currentTime = System.currentTimeMillis();
523
524 for (UserNotificationEvent persistedNotificationEvent :
525 userNotificationEvents) {
526
527 try {
528 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
529 persistedNotificationEvent.getPayload());
530
531 NotificationEvent notificationEvent =
532 NotificationEventFactoryUtil.createNotificationEvent(
533 persistedNotificationEvent.getTimestamp(),
534 persistedNotificationEvent.getType(),
535 payloadJSONObject);
536
537 notificationEvent.setDeliveryRequired(
538 persistedNotificationEvent.getDeliverBy());
539
540 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
541
542 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
543 invalidNotificationEventUuids.add(
544 notificationEvent.getUuid());
545 }
546 else {
547 unconfirmedNotificationEvents.put(
548 notificationEvent.getUuid(), notificationEvent);
549 }
550 }
551 catch (JSONException jsone) {
552 _log.error(jsone, jsone);
553
554 invalidNotificationEventUuids.add(
555 persistedNotificationEvent.getUuid());
556 }
557 }
558
559 if (!invalidNotificationEventUuids.isEmpty()) {
560 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
561 invalidNotificationEventUuids, getCompanyId());
562 }
563 }
564
565 protected boolean isRemoveNotificationEvent(
566 NotificationEvent notificationEvent, long currentTime) {
567
568 if ((notificationEvent.getDeliverBy() != 0) &&
569 (notificationEvent.getDeliverBy() <= currentTime)) {
570
571 return true;
572 }
573 else {
574 return false;
575 }
576 }
577
578 protected void storeNotificationEvent(
579 NotificationEvent notificationEvent, long currentTime) {
580
581 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
582 return;
583 }
584
585 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
586 notificationEvent.isDeliveryRequired()) {
587
588 Map<String, NotificationEvent> unconfirmedNotificationEvents =
589 _getUnconfirmedNotificationEvents();
590
591 unconfirmedNotificationEvents.put(
592 notificationEvent.getUuid(), notificationEvent);
593 }
594 else {
595 TreeSet<NotificationEvent> notificationEvents =
596 _getNotificationEvents();
597
598 notificationEvents.add(notificationEvent);
599
600 if (notificationEvents.size() >
601 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
602
603 NotificationEvent firstNotificationEvent =
604 notificationEvents.first();
605
606 notificationEvents.remove(firstNotificationEvent);
607 }
608 }
609 }
610
611 private TreeSet<NotificationEvent> _getNotificationEvents() {
612 if (_notificationEvents == null) {
613 _notificationEvents = new TreeSet<NotificationEvent>(_comparator);
614 }
615
616 return _notificationEvents;
617 }
618
619 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
620 if (_unconfirmedNotificationEvents == null) {
621 _unconfirmedNotificationEvents =
622 new HashMap<String, NotificationEvent>();
623 }
624
625 return _unconfirmedNotificationEvents;
626 }
627
628 private static Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
629
630 private static Comparator<NotificationEvent> _comparator =
631 new NotificationEventComparator();
632
633 private TreeSet<NotificationEvent> _notificationEvents;
634 private ReentrantLock _reentrantLock = new ReentrantLock();
635 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
636
637 }