001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.notifications.Channel;
018    import com.liferay.portal.kernel.notifications.ChannelException;
019    import com.liferay.portal.kernel.notifications.ChannelHub;
020    import com.liferay.portal.kernel.notifications.ChannelListener;
021    import com.liferay.portal.kernel.notifications.NotificationEvent;
022    import com.liferay.portal.kernel.notifications.UnknownChannelException;
023    import com.liferay.portal.model.CompanyConstants;
024    import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
025    import com.liferay.portal.util.PropsValues;
026    
027    import java.util.ArrayList;
028    import java.util.Collection;
029    import java.util.Collections;
030    import java.util.Iterator;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.Set;
034    import java.util.concurrent.ConcurrentHashMap;
035    import java.util.concurrent.ConcurrentMap;
036    
037    /**
038     * @author Edward Han
039     * @author Brian Wing Shun Chan
040     * @author Shuyang Zhou
041     */
042    public class ChannelHubImpl implements ChannelHub {
043    
044            public void cleanUp() throws ChannelException {
045                    for (Channel channel : _channels.values()) {
046                            channel.cleanUp();
047                    }
048            }
049    
050            public void cleanUp(long userId) throws ChannelException {
051                    Channel channel = getChannel(userId);
052    
053                    channel.cleanUp();
054            }
055    
056            public ChannelHub clone(long companyId) {
057                    ChannelHubImpl channelHubImpl = new ChannelHubImpl();
058    
059                    channelHubImpl.setChannelPrototype(_channel);
060                    channelHubImpl.setCompanyId(companyId);
061    
062                    return channelHubImpl;
063            }
064    
065            public void confirmDelivery(
066                            long userId, Collection<String> notificationEventUuids)
067                    throws ChannelException {
068    
069                    confirmDelivery(userId, notificationEventUuids, false);
070            }
071    
072            public void confirmDelivery(
073                            long userId, Collection<String> notificationEventUuids,
074                            boolean archive)
075                    throws ChannelException {
076    
077                    Channel channel = getChannel(userId);
078    
079                    channel.confirmDelivery(notificationEventUuids, archive);
080            }
081    
082            public void confirmDelivery(long userId, String notificationEventUuid)
083                    throws ChannelException {
084    
085                    confirmDelivery(userId, notificationEventUuid, false);
086            }
087    
088            public void confirmDelivery(
089                            long userId, String notificationEventUuid, boolean archive)
090                    throws ChannelException {
091    
092                    Channel channel = getChannel(userId);
093    
094                    channel.confirmDelivery(notificationEventUuid, archive);
095            }
096    
097            public Channel createChannel(long userId) throws ChannelException {
098                    if (_channels.containsKey(userId)) {
099                            return _channels.get(userId);
100                    }
101    
102                    Channel channel = _channel.clone(_companyId, userId);
103    
104                    Channel oldChannel = _channels.putIfAbsent(userId, channel);
105    
106                    if (oldChannel != null) {
107                            channel.sendNotificationEvents(oldChannel.getNotificationEvents());
108                    }
109    
110                    channel.init();
111    
112                    return channel;
113            }
114    
115            public void deleteUserNotificiationEvent(
116                            long userId, String notificationEventUuid)
117                    throws ChannelException {
118    
119                    Channel channel = getChannel(userId);
120    
121                    channel.deleteUserNotificiationEvent(notificationEventUuid);
122            }
123    
124            public void deleteUserNotificiationEvents(
125                            long userId, Collection<String> notificationEventUuids)
126                    throws ChannelException {
127    
128                    Channel channel = getChannel(userId);
129    
130                    channel.deleteUserNotificiationEvents(notificationEventUuids);
131            }
132    
133            public void destroy() throws ChannelException {
134                    Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
135    
136                    Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
137    
138                    while (itr.hasNext()) {
139                            Channel channel = itr.next().getValue();
140    
141                            channel.close();
142    
143                            itr.remove();
144                    }
145            }
146    
147            public Channel destroyChannel(long userId) throws ChannelException {
148                    Channel channel = _channels.remove(userId);
149    
150                    if (channel != null) {
151                            channel.close();
152                    }
153    
154                    return channel;
155            }
156    
157            public Channel fetchChannel(long userId) throws ChannelException {
158                    return fetchChannel(userId, false);
159            }
160    
161            public Channel fetchChannel(long userId, boolean createIfAbsent)
162                    throws ChannelException {
163    
164                    Channel channel = _channels.get(userId);
165    
166                    if (channel == null) {
167                            synchronized (_channels) {
168                                    channel = _channels.get(userId);
169    
170                                    if (channel == null) {
171                                            if (createIfAbsent) {
172                                                    channel = createChannel(userId);
173                                            }
174                                    }
175                            }
176                    }
177    
178                    return channel;
179            }
180    
181            public List<NotificationEvent> fetchNotificationEvents(long userId)
182                    throws ChannelException {
183    
184                    return fetchNotificationEvents(userId, false);
185            }
186    
187            public List<NotificationEvent> fetchNotificationEvents(
188                            long userId, boolean flush)
189                    throws ChannelException {
190    
191                    Channel channel = fetchChannel(userId);
192    
193                    if (channel == null) {
194                            return Collections.emptyList();
195                    }
196    
197                    return channel.getNotificationEvents(flush);
198            }
199    
200            public void flush() throws ChannelException {
201                    for (Channel channel : _channels.values()) {
202                            channel.flush();
203                    }
204            }
205    
206            public void flush(long userId) throws ChannelException {
207                    Channel channel = fetchChannel(userId);
208    
209                    if (channel != null) {
210                            channel.flush();
211                    }
212            }
213    
214            public void flush(long userId, long timestamp) throws ChannelException {
215                    Channel channel = fetchChannel(userId);
216    
217                    if (channel != null) {
218                            channel.flush(timestamp);
219                    }
220            }
221    
222            public Channel getChannel(long userId) throws ChannelException {
223                    return getChannel(userId, false);
224            }
225    
226            public Channel getChannel(long userId, boolean createIfAbsent)
227                    throws ChannelException {
228    
229                    Channel channel = fetchChannel(userId, createIfAbsent);
230    
231                    if (channel == null) {
232                            throw new UnknownChannelException(
233                                    "No channel exists with user id " + userId);
234                    }
235    
236                    return channel;
237            }
238    
239            public long getCompanyId() {
240                    return _companyId;
241            }
242    
243            public List<NotificationEvent> getNotificationEvents(long userId)
244                    throws ChannelException {
245    
246                    return getNotificationEvents(userId, false);
247            }
248    
249            public List<NotificationEvent> getNotificationEvents(
250                            long userId, boolean flush)
251                    throws ChannelException {
252    
253                    Channel channel = getChannel(userId, false);
254    
255                    return channel.getNotificationEvents(flush);
256            }
257    
258            public Collection<Long> getUserIds() {
259                    return Collections.unmodifiableSet(_channels.keySet());
260            }
261    
262            public void registerChannelListener(
263                            long userId, ChannelListener channelListener)
264                    throws ChannelException {
265    
266                    Channel channel = getChannel(userId);
267    
268                    channel.registerChannelListener(channelListener);
269            }
270    
271            public void removeTransientNotificationEvents(
272                            long userId, Collection<NotificationEvent> notificationEvents)
273                    throws ChannelException {
274    
275                    Channel channel = fetchChannel(userId);
276    
277                    if (channel != null) {
278                            channel.removeTransientNotificationEvents(notificationEvents);
279                    }
280            }
281    
282            public void removeTransientNotificationEventsByUuid(
283                            long userId, Collection<String> notificationEventUuids)
284                    throws ChannelException {
285    
286                    Channel channel = fetchChannel(userId);
287    
288                    if (channel != null) {
289                            channel.removeTransientNotificationEventsByUuid(
290                                    notificationEventUuids);
291                    }
292            }
293    
294            public void sendNotificationEvent(
295                            long userId, NotificationEvent notificationEvent)
296                    throws ChannelException {
297    
298                    Channel channel = fetchChannel(userId);
299    
300                    if (channel != null) {
301                            channel.sendNotificationEvent(notificationEvent);
302    
303                            return;
304                    }
305    
306                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
307                            !notificationEvent.isDeliveryRequired()) {
308    
309                            return;
310                    }
311    
312                    try {
313                            UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
314                                    userId, notificationEvent);
315                    }
316                    catch (Exception e) {
317                            throw new ChannelException("Unable to send event", e);
318                    }
319            }
320    
321            public void sendNotificationEvents(
322                            long userId, Collection<NotificationEvent> notificationEvents)
323                    throws ChannelException {
324    
325                    Channel channel = fetchChannel(userId);
326    
327                    if (channel != null) {
328                            channel.sendNotificationEvents(notificationEvents);
329    
330                            return;
331                    }
332    
333                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
334                            return;
335                    }
336    
337                    List<NotificationEvent> persistedNotificationEvents =
338                            new ArrayList<NotificationEvent>(notificationEvents.size());
339    
340                    for (NotificationEvent notificationEvent : notificationEvents) {
341                            if (notificationEvent.isDeliveryRequired()) {
342                                    persistedNotificationEvents.add(notificationEvent);
343                            }
344                    }
345    
346                    try {
347                            UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
348                                    userId, persistedNotificationEvents);
349                    }
350                    catch (Exception e) {
351                            throw new ChannelException("Unable to send events", e);
352                    }
353            }
354    
355            public void setChannelPrototype(Channel channel) {
356                    _channel = channel;
357            }
358    
359            public void setCompanyId(long companyId) {
360                    _companyId = companyId;
361            }
362    
363            public void unregisterChannelListener(
364                            long userId, ChannelListener channelListener)
365                    throws ChannelException {
366    
367                    Channel channel = getChannel(userId);
368    
369                    channel.unregisterChannelListener(channelListener);
370            }
371    
372            private Channel _channel;
373            private ConcurrentMap<Long, Channel> _channels =
374                    new ConcurrentHashMap<Long, Channel>();
375            private long _companyId = CompanyConstants.SYSTEM;
376    
377    }