001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterRequest;
019    import com.liferay.portal.kernel.notifications.Channel;
020    import com.liferay.portal.kernel.notifications.ChannelException;
021    import com.liferay.portal.kernel.notifications.ChannelHub;
022    import com.liferay.portal.kernel.notifications.ChannelHubManager;
023    import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
024    import com.liferay.portal.kernel.notifications.ChannelListener;
025    import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
026    import com.liferay.portal.kernel.notifications.NotificationEvent;
027    import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
028    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    import com.liferay.portal.kernel.util.MethodKey;
031    
032    import java.util.Collection;
033    import java.util.Collections;
034    import java.util.List;
035    import java.util.concurrent.ConcurrentHashMap;
036    import java.util.concurrent.ConcurrentMap;
037    
038    /**
039     * @author Edward Han
040     * @author Brian Wing Shun
041     * @author Shuyang Zhou
042     */
043    @DoPrivileged
044    public class ChannelHubManagerImpl implements ChannelHubManager {
045    
046            @Override
047            public void confirmDelivery(
048                            long companyId, long userId,
049                            Collection<String> notificationEventUuids)
050                    throws ChannelException {
051    
052                    confirmDelivery(companyId, userId, notificationEventUuids, false);
053            }
054    
055            @Override
056            public void confirmDelivery(
057                            long companyId, long userId,
058                            Collection<String> notificationEventUuids, boolean archive)
059                    throws ChannelException {
060    
061                    ChannelHub channelHub = getChannelHub(companyId);
062    
063                    channelHub.confirmDelivery(userId, notificationEventUuids, archive);
064            }
065    
066            @Override
067            public void confirmDelivery(
068                            long companyId, long userId, String notificationEventUuid)
069                    throws ChannelException {
070    
071                    confirmDelivery(companyId, userId, notificationEventUuid, false);
072            }
073    
074            @Override
075            public void confirmDelivery(
076                            long companyId, long userId, String notificationEventUuid,
077                            boolean archive)
078                    throws ChannelException {
079    
080                    ChannelHub channelHub = getChannelHub(companyId);
081    
082                    channelHub.confirmDelivery(userId, notificationEventUuid, archive);
083            }
084    
085            @Override
086            public Channel createChannel(long companyId, long userId)
087                    throws ChannelException {
088    
089                    ChannelHub channelHub = getChannelHub(companyId);
090    
091                    return channelHub.createChannel(userId);
092            }
093    
094            @Override
095            public ChannelHub createChannelHub(long companyId) throws ChannelException {
096                    ChannelHub channelHub = _channelHub.clone(companyId);
097    
098                    if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
099                            throw new DuplicateChannelHubException(
100                                    "Channel already exists with company id " + companyId);
101                    }
102    
103                    return channelHub;
104            }
105    
106            @Override
107            public void deleteUserNotificiationEvent(
108                            long companyId, long userId, String notificationEventUuid)
109                    throws ChannelException {
110    
111                    ChannelHub channelHub = getChannelHub(companyId);
112    
113                    channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
114            }
115    
116            @Override
117            public void deleteUserNotificiationEvents(
118                            long companyId, long userId,
119                            Collection<String> notificationEventUuids)
120                    throws ChannelException {
121    
122                    ChannelHub channelHub = getChannelHub(companyId);
123    
124                    channelHub.deleteUserNotificiationEvents(
125                            userId, notificationEventUuids);
126            }
127    
128            @Override
129            public void destroyChannel(long companyId, long userId)
130                    throws ChannelException {
131    
132                    ChannelHub channelHub = getChannelHub(companyId);
133    
134                    channelHub.destroyChannel(userId);
135            }
136    
137            @Override
138            public void destroyChannelHub(long companyId) throws ChannelException {
139                    ChannelHub channelHub = _channelHubs.remove(companyId);
140    
141                    if (channelHub != null) {
142                            channelHub.destroy();
143                    }
144            }
145    
146            @Override
147            public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
148                    return fetchChannelHub(companyId, false);
149            }
150    
151            @Override
152            public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
153                    throws ChannelException {
154    
155                    ChannelHub channelHub = _channelHubs.get(companyId);
156    
157                    if (channelHub == null) {
158                            synchronized(_channelHubs) {
159                                    channelHub = _channelHubs.get(companyId);
160    
161                                    if (channelHub == null) {
162                                            if (createIfAbsent) {
163                                                    channelHub = createChannelHub(companyId);
164                                            }
165                                    }
166                            }
167                    }
168    
169                    return channelHub;
170            }
171    
172            @Override
173            public List<NotificationEvent> fetchNotificationEvents(
174                            long companyId, long userId, boolean flush)
175                    throws ChannelException {
176    
177                    ChannelHub channelHub = fetchChannelHub(companyId);
178    
179                    if (channelHub == null) {
180                            return Collections.emptyList();
181                    }
182    
183                    return channelHub.fetchNotificationEvents(userId, flush);
184            }
185    
186            @Override
187            public void flush() throws ChannelException {
188                    for (ChannelHub channelHub : _channelHubs.values()) {
189                            channelHub.flush();
190                    }
191            }
192    
193            @Override
194            public void flush(long companyId) throws ChannelException {
195                    ChannelHub channelHub = fetchChannelHub(companyId);
196    
197                    if (channelHub != null) {
198                            channelHub.flush();
199                    }
200            }
201    
202            @Override
203            public void flush(long companyId, long userId, long timestamp)
204                    throws ChannelException {
205    
206                    ChannelHub channelHub = fetchChannelHub(companyId);
207    
208                    if (channelHub != null) {
209                            channelHub.flush(userId, timestamp);
210                    }
211            }
212    
213            @Override
214            public Channel getChannel(long companyId, long userId)
215                    throws ChannelException {
216    
217                    return getChannel(companyId, userId, false);
218            }
219    
220            @Override
221            public Channel getChannel(
222                            long companyId, long userId, boolean createIfAbsent)
223                    throws ChannelException {
224    
225                    ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
226    
227                    return channelHub.getChannel(userId, createIfAbsent);
228            }
229    
230            @Override
231            public ChannelHub getChannelHub(long companyId) throws ChannelException {
232                    return getChannelHub(companyId, false);
233            }
234    
235            @Override
236            public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
237                    throws ChannelException {
238    
239                    ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
240    
241                    if (channelHub == null) {
242                            throw new UnknownChannelHubException(
243                                    "No channel exists with company id " + companyId);
244                    }
245    
246                    return channelHub;
247            }
248    
249            @Override
250            public List<NotificationEvent> getNotificationEvents(
251                            long companyId, long userId)
252                    throws ChannelException {
253    
254                    ChannelHub channelHub = getChannelHub(companyId);
255    
256                    return channelHub.getNotificationEvents(userId);
257            }
258    
259            @Override
260            public List<NotificationEvent> getNotificationEvents(
261                            long companyId, long userId, boolean flush)
262                    throws ChannelException {
263    
264                    ChannelHub channelHub = getChannelHub(companyId);
265    
266                    return channelHub.getNotificationEvents(userId, flush);
267            }
268    
269            @Override
270            public Collection<Long> getUserIds(long companyId) throws ChannelException {
271                    ChannelHub channelHub = getChannelHub(companyId);
272    
273                    return channelHub.getUserIds();
274            }
275    
276            @Override
277            public void registerChannelListener(
278                            long companyId, long userId, ChannelListener channelListener)
279                    throws ChannelException {
280    
281                    ChannelHub channelHub = getChannelHub(companyId);
282    
283                    channelHub.registerChannelListener(userId, channelListener);
284            }
285    
286            @Override
287            public void removeTransientNotificationEvents(
288                            long companyId, long userId,
289                            Collection<NotificationEvent> notificationEvents)
290                    throws ChannelException {
291    
292                    ChannelHub channelHub = getChannelHub(companyId);
293    
294                    channelHub.removeTransientNotificationEvents(
295                            userId, notificationEvents);
296            }
297    
298            @Override
299            public void removeTransientNotificationEventsByUuid(
300                            long companyId, long userId,
301                            Collection<String> notificationEventUuids)
302                    throws ChannelException {
303    
304                    ChannelHub channelHub = getChannelHub(companyId);
305    
306                    channelHub.removeTransientNotificationEventsByUuid(
307                            userId, notificationEventUuids);
308            }
309    
310            @Override
311            public void sendClusterNotificationEvent(
312                            long companyId, long userId, NotificationEvent notificationEvent)
313                    throws ChannelException {
314    
315                    sendNotificationEvent(companyId, userId, notificationEvent);
316    
317                    MethodHandler methodHandler = new MethodHandler(
318                            _storeNotificationEventMethodKey, companyId, userId,
319                            notificationEvent);
320    
321                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
322                            methodHandler, true);
323    
324                    try {
325                            ClusterExecutorUtil.execute(clusterRequest);
326                    }
327                    catch (Exception e) {
328                            throw new ChannelException("Unable to notify cluster of event", e);
329                    }
330            }
331    
332            @Override
333            public void sendNotificationEvent(
334                            long companyId, long userId, NotificationEvent notificationEvent)
335                    throws ChannelException {
336    
337                    ChannelHub channelHub = getChannelHub(companyId);
338    
339                    channelHub.sendNotificationEvent(userId, notificationEvent);
340            }
341    
342            @Override
343            public void sendNotificationEvents(
344                            long companyId, long userId,
345                            Collection<NotificationEvent> notificationEvents)
346                    throws ChannelException {
347    
348                    ChannelHub channelHub = getChannelHub(companyId);
349    
350                    channelHub.sendNotificationEvents(userId, notificationEvents);
351            }
352    
353            public void setChannelHubPrototype(ChannelHub channelHub) {
354                    _channelHub = channelHub;
355            }
356    
357            @Override
358            public void storeNotificationEvent(
359                            long companyId, long userId, NotificationEvent notificationEvent)
360                    throws ChannelException {
361    
362                    ChannelHub channelHub = getChannelHub(companyId);
363    
364                    channelHub.storeNotificationEvent(userId, notificationEvent);
365            }
366    
367            @Override
368            public void unregisterChannelListener(
369                            long companyId, long userId, ChannelListener channelListener)
370                    throws ChannelException {
371    
372                    ChannelHub channelHub = getChannelHub(companyId);
373    
374                    channelHub.unregisterChannelListener(userId, channelListener);
375            }
376    
377            private static final MethodKey _storeNotificationEventMethodKey =
378                    new MethodKey(
379                            ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
380                            long.class, NotificationEvent.class);
381    
382            private ChannelHub _channelHub;
383            private ConcurrentMap<Long, ChannelHub> _channelHubs =
384                    new ConcurrentHashMap<Long, ChannelHub>();
385    
386    }