001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.notifications.Channel;
018    import com.liferay.portal.kernel.notifications.ChannelException;
019    import com.liferay.portal.kernel.notifications.ChannelHub;
020    import com.liferay.portal.kernel.notifications.ChannelListener;
021    import com.liferay.portal.kernel.notifications.NotificationEvent;
022    import com.liferay.portal.kernel.notifications.UnknownChannelException;
023    import com.liferay.portal.kernel.service.UserNotificationEventLocalServiceUtil;
024    import com.liferay.portal.util.PropsValues;
025    
026    import java.util.ArrayList;
027    import java.util.Collection;
028    import java.util.Collections;
029    import java.util.Iterator;
030    import java.util.List;
031    import java.util.Map;
032    import java.util.Set;
033    import java.util.concurrent.ConcurrentHashMap;
034    import java.util.concurrent.ConcurrentMap;
035    
036    /**
037     * @author Edward Han
038     * @author Brian Wing Shun Chan
039     * @author Shuyang Zhou
040     */
041    public class ChannelHubImpl implements ChannelHub {
042    
043            public ChannelHubImpl(long companyId) {
044                    _companyId = companyId;
045            }
046    
047            @Override
048            public void cleanUp() throws ChannelException {
049                    for (Channel channel : _channels.values()) {
050                            channel.cleanUp();
051                    }
052            }
053    
054            @Override
055            public void cleanUp(long userId) throws ChannelException {
056                    Channel channel = getChannel(userId);
057    
058                    channel.cleanUp();
059            }
060    
061            @Override
062            public void confirmDelivery(
063                            long userId, Collection<String> notificationEventUuids)
064                    throws ChannelException {
065    
066                    confirmDelivery(userId, notificationEventUuids, false);
067            }
068    
069            @Override
070            public void confirmDelivery(
071                            long userId, Collection<String> notificationEventUuids,
072                            boolean archive)
073                    throws ChannelException {
074    
075                    Channel channel = getChannel(userId);
076    
077                    channel.confirmDelivery(notificationEventUuids, archive);
078            }
079    
080            @Override
081            public void confirmDelivery(long userId, String notificationEventUuid)
082                    throws ChannelException {
083    
084                    confirmDelivery(userId, notificationEventUuid, false);
085            }
086    
087            @Override
088            public void confirmDelivery(
089                            long userId, String notificationEventUuid, boolean archive)
090                    throws ChannelException {
091    
092                    Channel channel = getChannel(userId);
093    
094                    channel.confirmDelivery(notificationEventUuid, archive);
095            }
096    
097            @Override
098            public Channel createChannel(long userId) throws ChannelException {
099                    if (_channels.containsKey(userId)) {
100                            return _channels.get(userId);
101                    }
102    
103                    Channel channel = new ChannelImpl(_companyId, userId);
104    
105                    Channel oldChannel = _channels.putIfAbsent(userId, channel);
106    
107                    if (oldChannel != null) {
108                            channel.sendNotificationEvents(oldChannel.getNotificationEvents());
109                    }
110    
111                    channel.init();
112    
113                    return channel;
114            }
115    
116            @Override
117            public void deleteUserNotificiationEvent(
118                            long userId, String notificationEventUuid)
119                    throws ChannelException {
120    
121                    Channel channel = getChannel(userId);
122    
123                    channel.deleteUserNotificiationEvent(notificationEventUuid);
124            }
125    
126            @Override
127            public void deleteUserNotificiationEvents(
128                            long userId, Collection<String> notificationEventUuids)
129                    throws ChannelException {
130    
131                    Channel channel = getChannel(userId);
132    
133                    channel.deleteUserNotificiationEvents(notificationEventUuids);
134            }
135    
136            @Override
137            public void destroy() throws ChannelException {
138                    Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
139    
140                    Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
141    
142                    while (itr.hasNext()) {
143                            Channel channel = itr.next().getValue();
144    
145                            channel.close();
146    
147                            itr.remove();
148                    }
149            }
150    
151            @Override
152            public Channel destroyChannel(long userId) throws ChannelException {
153                    Channel channel = _channels.remove(userId);
154    
155                    if (channel != null) {
156                            channel.close();
157                    }
158    
159                    return channel;
160            }
161    
162            @Override
163            public Channel fetchChannel(long userId) throws ChannelException {
164                    return fetchChannel(userId, false);
165            }
166    
167            @Override
168            public Channel fetchChannel(long userId, boolean createIfAbsent)
169                    throws ChannelException {
170    
171                    Channel channel = _channels.get(userId);
172    
173                    if (channel == null) {
174                            synchronized (_channels) {
175                                    channel = _channels.get(userId);
176    
177                                    if (channel == null) {
178                                            if (createIfAbsent) {
179                                                    channel = createChannel(userId);
180                                            }
181                                    }
182                            }
183                    }
184    
185                    return channel;
186            }
187    
188            @Override
189            public List<NotificationEvent> fetchNotificationEvents(long userId)
190                    throws ChannelException {
191    
192                    return fetchNotificationEvents(userId, false);
193            }
194    
195            @Override
196            public List<NotificationEvent> fetchNotificationEvents(
197                            long userId, boolean flush)
198                    throws ChannelException {
199    
200                    Channel channel = fetchChannel(userId);
201    
202                    if (channel == null) {
203                            return Collections.emptyList();
204                    }
205    
206                    return channel.getNotificationEvents(flush);
207            }
208    
209            @Override
210            public void flush() throws ChannelException {
211                    for (Channel channel : _channels.values()) {
212                            channel.flush();
213                    }
214            }
215    
216            @Override
217            public void flush(long userId) throws ChannelException {
218                    Channel channel = fetchChannel(userId);
219    
220                    if (channel != null) {
221                            channel.flush();
222                    }
223            }
224    
225            @Override
226            public void flush(long userId, long timestamp) throws ChannelException {
227                    Channel channel = fetchChannel(userId);
228    
229                    if (channel != null) {
230                            channel.flush(timestamp);
231                    }
232            }
233    
234            @Override
235            public Channel getChannel(long userId) throws ChannelException {
236                    return getChannel(userId, false);
237            }
238    
239            @Override
240            public Channel getChannel(long userId, boolean createIfAbsent)
241                    throws ChannelException {
242    
243                    Channel channel = fetchChannel(userId, createIfAbsent);
244    
245                    if (channel == null) {
246                            throw new UnknownChannelException(
247                                    "No channel exists with user id " + userId);
248                    }
249    
250                    return channel;
251            }
252    
253            public long getCompanyId() {
254                    return _companyId;
255            }
256    
257            @Override
258            public List<NotificationEvent> getNotificationEvents(long userId)
259                    throws ChannelException {
260    
261                    return getNotificationEvents(userId, false);
262            }
263    
264            @Override
265            public List<NotificationEvent> getNotificationEvents(
266                            long userId, boolean flush)
267                    throws ChannelException {
268    
269                    Channel channel = getChannel(userId, false);
270    
271                    return channel.getNotificationEvents(flush);
272            }
273    
274            @Override
275            public Collection<Long> getUserIds() {
276                    return Collections.unmodifiableSet(_channels.keySet());
277            }
278    
279            @Override
280            public void registerChannelListener(
281                            long userId, ChannelListener channelListener)
282                    throws ChannelException {
283    
284                    Channel channel = getChannel(userId);
285    
286                    channel.registerChannelListener(channelListener);
287            }
288    
289            @Override
290            public void removeTransientNotificationEvents(
291                            long userId, Collection<NotificationEvent> notificationEvents)
292                    throws ChannelException {
293    
294                    Channel channel = fetchChannel(userId);
295    
296                    if (channel != null) {
297                            channel.removeTransientNotificationEvents(notificationEvents);
298                    }
299            }
300    
301            @Override
302            public void removeTransientNotificationEventsByUuid(
303                            long userId, Collection<String> notificationEventUuids)
304                    throws ChannelException {
305    
306                    Channel channel = fetchChannel(userId);
307    
308                    if (channel != null) {
309                            channel.removeTransientNotificationEventsByUuid(
310                                    notificationEventUuids);
311                    }
312            }
313    
314            @Override
315            public void sendNotificationEvent(
316                            long userId, NotificationEvent notificationEvent)
317                    throws ChannelException {
318    
319                    Channel channel = fetchChannel(userId);
320    
321                    if (channel != null) {
322                            channel.sendNotificationEvent(notificationEvent);
323    
324                            return;
325                    }
326    
327                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
328                            !notificationEvent.isDeliveryRequired()) {
329    
330                            return;
331                    }
332    
333                    try {
334                            UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
335                                    userId, notificationEvent);
336                    }
337                    catch (Exception e) {
338                            throw new ChannelException("Unable to send event", e);
339                    }
340            }
341    
342            @Override
343            public void sendNotificationEvents(
344                            long userId, Collection<NotificationEvent> notificationEvents)
345                    throws ChannelException {
346    
347                    Channel channel = fetchChannel(userId);
348    
349                    if (channel != null) {
350                            channel.sendNotificationEvents(notificationEvents);
351    
352                            return;
353                    }
354    
355                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
356                            return;
357                    }
358    
359                    List<NotificationEvent> persistedNotificationEvents = new ArrayList<>(
360                            notificationEvents.size());
361    
362                    for (NotificationEvent notificationEvent : notificationEvents) {
363                            if (notificationEvent.isDeliveryRequired()) {
364                                    persistedNotificationEvents.add(notificationEvent);
365                            }
366                    }
367    
368                    try {
369                            UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
370                                    userId, persistedNotificationEvents);
371                    }
372                    catch (Exception e) {
373                            throw new ChannelException("Unable to send events", e);
374                    }
375            }
376    
377            @Override
378            public void storeNotificationEvent(
379                            long userId, NotificationEvent notificationEvent)
380                    throws ChannelException {
381    
382                    Channel channel = fetchChannel(userId);
383    
384                    if (channel != null) {
385                            channel.storeNotificationEvent(
386                                    notificationEvent, System.currentTimeMillis());
387                    }
388            }
389    
390            @Override
391            public void unregisterChannelListener(
392                            long userId, ChannelListener channelListener)
393                    throws ChannelException {
394    
395                    Channel channel = getChannel(userId);
396    
397                    channel.unregisterChannelListener(channelListener);
398            }
399    
400            private final ConcurrentMap<Long, Channel> _channels =
401                    new ConcurrentHashMap<>();
402            private final long _companyId;
403    
404    }