001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterRequest;
019    import com.liferay.portal.kernel.notifications.Channel;
020    import com.liferay.portal.kernel.notifications.ChannelException;
021    import com.liferay.portal.kernel.notifications.ChannelHub;
022    import com.liferay.portal.kernel.notifications.ChannelHubManager;
023    import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
024    import com.liferay.portal.kernel.notifications.ChannelListener;
025    import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
026    import com.liferay.portal.kernel.notifications.NotificationEvent;
027    import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
028    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    import com.liferay.portal.kernel.util.MethodKey;
031    
032    import java.util.Collection;
033    import java.util.Collections;
034    import java.util.List;
035    import java.util.concurrent.ConcurrentHashMap;
036    import java.util.concurrent.ConcurrentMap;
037    
038    /**
039     * @author Edward Han
040     * @author Brian Wing Shun
041     * @author Shuyang Zhou
042     */
043    @DoPrivileged
044    public class ChannelHubManagerImpl implements ChannelHubManager {
045    
046            @Override
047            public void confirmDelivery(
048                            long companyId, long userId,
049                            Collection<String> notificationEventUuids)
050                    throws ChannelException {
051    
052                    confirmDelivery(companyId, userId, notificationEventUuids, false);
053            }
054    
055            @Override
056            public void confirmDelivery(
057                            long companyId, long userId,
058                            Collection<String> notificationEventUuids, boolean archive)
059                    throws ChannelException {
060    
061                    ChannelHub channelHub = getChannelHub(companyId);
062    
063                    channelHub.confirmDelivery(userId, notificationEventUuids, archive);
064            }
065    
066            @Override
067            public void confirmDelivery(
068                            long companyId, long userId, String notificationEventUuid)
069                    throws ChannelException {
070    
071                    confirmDelivery(companyId, userId, notificationEventUuid, false);
072            }
073    
074            @Override
075            public void confirmDelivery(
076                            long companyId, long userId, String notificationEventUuid,
077                            boolean archive)
078                    throws ChannelException {
079    
080                    ChannelHub channelHub = getChannelHub(companyId);
081    
082                    channelHub.confirmDelivery(userId, notificationEventUuid, archive);
083            }
084    
085            @Override
086            public Channel createChannel(long companyId, long userId)
087                    throws ChannelException {
088    
089                    ChannelHub channelHub = getChannelHub(companyId);
090    
091                    return channelHub.createChannel(userId);
092            }
093    
094            @Override
095            public ChannelHub createChannelHub(long companyId) throws ChannelException {
096                    ChannelHub channelHub = _channelHub.clone(companyId);
097    
098                    if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
099                            throw new DuplicateChannelHubException(
100                                    "Channel already exists with company id " + companyId);
101                    }
102    
103                    return channelHub;
104            }
105    
106            @Override
107            public void deleteUserNotificiationEvent(
108                            long companyId, long userId, String notificationEventUuid)
109                    throws ChannelException {
110    
111                    ChannelHub channelHub = getChannelHub(companyId);
112    
113                    channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
114            }
115    
116            @Override
117            public void deleteUserNotificiationEvents(
118                            long companyId, long userId,
119                            Collection<String> notificationEventUuids)
120                    throws ChannelException {
121    
122                    ChannelHub channelHub = getChannelHub(companyId);
123    
124                    channelHub.deleteUserNotificiationEvents(
125                            userId, notificationEventUuids);
126            }
127    
128            @Override
129            public void destroyChannel(long companyId, long userId)
130                    throws ChannelException {
131    
132                    ChannelHub channelHub = getChannelHub(companyId);
133    
134                    channelHub.destroyChannel(userId);
135            }
136    
137            @Override
138            public void destroyChannelHub(long companyId) throws ChannelException {
139                    ChannelHub channelHub = _channelHubs.remove(companyId);
140    
141                    if (channelHub != null) {
142                            channelHub.destroy();
143                    }
144            }
145    
146            @Override
147            public void destroyClusterChannel(long companyId, long userId)
148                    throws ChannelException {
149    
150                    destroyChannel(companyId, userId);
151    
152                    MethodHandler methodHandler = new MethodHandler(
153                            _destroyChannelMethodKey, companyId, userId);
154    
155                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
156                            methodHandler, true);
157    
158                    try {
159                            ClusterExecutorUtil.execute(clusterRequest);
160                    }
161                    catch (Exception e) {
162                            throw new ChannelException(
163                                    "Unable to destroy channel across cluster", e);
164                    }
165            }
166    
167            @Override
168            public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
169                    return fetchChannelHub(companyId, false);
170            }
171    
172            @Override
173            public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
174                    throws ChannelException {
175    
176                    ChannelHub channelHub = _channelHubs.get(companyId);
177    
178                    if (channelHub == null) {
179                            synchronized(_channelHubs) {
180                                    channelHub = _channelHubs.get(companyId);
181    
182                                    if (channelHub == null) {
183                                            if (createIfAbsent) {
184                                                    channelHub = createChannelHub(companyId);
185                                            }
186                                    }
187                            }
188                    }
189    
190                    return channelHub;
191            }
192    
193            @Override
194            public List<NotificationEvent> fetchNotificationEvents(
195                            long companyId, long userId, boolean flush)
196                    throws ChannelException {
197    
198                    ChannelHub channelHub = fetchChannelHub(companyId);
199    
200                    if (channelHub == null) {
201                            return Collections.emptyList();
202                    }
203    
204                    return channelHub.fetchNotificationEvents(userId, flush);
205            }
206    
207            @Override
208            public void flush() throws ChannelException {
209                    for (ChannelHub channelHub : _channelHubs.values()) {
210                            channelHub.flush();
211                    }
212            }
213    
214            @Override
215            public void flush(long companyId) throws ChannelException {
216                    ChannelHub channelHub = fetchChannelHub(companyId);
217    
218                    if (channelHub != null) {
219                            channelHub.flush();
220                    }
221            }
222    
223            @Override
224            public void flush(long companyId, long userId, long timestamp)
225                    throws ChannelException {
226    
227                    ChannelHub channelHub = fetchChannelHub(companyId);
228    
229                    if (channelHub != null) {
230                            channelHub.flush(userId, timestamp);
231                    }
232            }
233    
234            @Override
235            public Channel getChannel(long companyId, long userId)
236                    throws ChannelException {
237    
238                    return getChannel(companyId, userId, false);
239            }
240    
241            @Override
242            public Channel getChannel(
243                            long companyId, long userId, boolean createIfAbsent)
244                    throws ChannelException {
245    
246                    ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
247    
248                    return channelHub.getChannel(userId, createIfAbsent);
249            }
250    
251            @Override
252            public ChannelHub getChannelHub(long companyId) throws ChannelException {
253                    return getChannelHub(companyId, false);
254            }
255    
256            @Override
257            public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
258                    throws ChannelException {
259    
260                    ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
261    
262                    if (channelHub == null) {
263                            throw new UnknownChannelHubException(
264                                    "No channel exists with company id " + companyId);
265                    }
266    
267                    return channelHub;
268            }
269    
270            @Override
271            public List<NotificationEvent> getNotificationEvents(
272                            long companyId, long userId)
273                    throws ChannelException {
274    
275                    ChannelHub channelHub = getChannelHub(companyId);
276    
277                    return channelHub.getNotificationEvents(userId);
278            }
279    
280            @Override
281            public List<NotificationEvent> getNotificationEvents(
282                            long companyId, long userId, boolean flush)
283                    throws ChannelException {
284    
285                    ChannelHub channelHub = getChannelHub(companyId);
286    
287                    return channelHub.getNotificationEvents(userId, flush);
288            }
289    
290            @Override
291            public Collection<Long> getUserIds(long companyId) throws ChannelException {
292                    ChannelHub channelHub = getChannelHub(companyId);
293    
294                    return channelHub.getUserIds();
295            }
296    
297            @Override
298            public void registerChannelListener(
299                            long companyId, long userId, ChannelListener channelListener)
300                    throws ChannelException {
301    
302                    ChannelHub channelHub = getChannelHub(companyId);
303    
304                    channelHub.registerChannelListener(userId, channelListener);
305            }
306    
307            @Override
308            public void removeTransientNotificationEvents(
309                            long companyId, long userId,
310                            Collection<NotificationEvent> notificationEvents)
311                    throws ChannelException {
312    
313                    ChannelHub channelHub = getChannelHub(companyId);
314    
315                    channelHub.removeTransientNotificationEvents(
316                            userId, notificationEvents);
317            }
318    
319            @Override
320            public void removeTransientNotificationEventsByUuid(
321                            long companyId, long userId,
322                            Collection<String> notificationEventUuids)
323                    throws ChannelException {
324    
325                    ChannelHub channelHub = getChannelHub(companyId);
326    
327                    channelHub.removeTransientNotificationEventsByUuid(
328                            userId, notificationEventUuids);
329            }
330    
331            @Override
332            public void sendClusterNotificationEvent(
333                            long companyId, long userId, NotificationEvent notificationEvent)
334                    throws ChannelException {
335    
336                    sendNotificationEvent(companyId, userId, notificationEvent);
337    
338                    MethodHandler methodHandler = new MethodHandler(
339                            _storeNotificationEventMethodKey, companyId, userId,
340                            notificationEvent);
341    
342                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
343                            methodHandler, true);
344    
345                    try {
346                            ClusterExecutorUtil.execute(clusterRequest);
347                    }
348                    catch (Exception e) {
349                            throw new ChannelException("Unable to notify cluster of event", e);
350                    }
351            }
352    
353            @Override
354            public void sendNotificationEvent(
355                            long companyId, long userId, NotificationEvent notificationEvent)
356                    throws ChannelException {
357    
358                    ChannelHub channelHub = getChannelHub(companyId);
359    
360                    channelHub.sendNotificationEvent(userId, notificationEvent);
361            }
362    
363            @Override
364            public void sendNotificationEvents(
365                            long companyId, long userId,
366                            Collection<NotificationEvent> notificationEvents)
367                    throws ChannelException {
368    
369                    ChannelHub channelHub = getChannelHub(companyId);
370    
371                    channelHub.sendNotificationEvents(userId, notificationEvents);
372            }
373    
374            public void setChannelHubPrototype(ChannelHub channelHub) {
375                    _channelHub = channelHub;
376            }
377    
378            @Override
379            public void storeNotificationEvent(
380                            long companyId, long userId, NotificationEvent notificationEvent)
381                    throws ChannelException {
382    
383                    ChannelHub channelHub = getChannelHub(companyId);
384    
385                    channelHub.storeNotificationEvent(userId, notificationEvent);
386            }
387    
388            @Override
389            public void unregisterChannelListener(
390                            long companyId, long userId, ChannelListener channelListener)
391                    throws ChannelException {
392    
393                    ChannelHub channelHub = getChannelHub(companyId);
394    
395                    channelHub.unregisterChannelListener(userId, channelListener);
396            }
397    
398            private static final MethodKey _destroyChannelMethodKey =
399                    new MethodKey(
400                            ChannelHubManagerUtil.class, "destroyChannel", long.class,
401                            long.class);
402            private static final MethodKey _storeNotificationEventMethodKey =
403                    new MethodKey(
404                            ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
405                            long.class, NotificationEvent.class);
406    
407            private ChannelHub _channelHub;
408            private final ConcurrentMap<Long, ChannelHub> _channelHubs =
409                    new ConcurrentHashMap<Long, ChannelHub>();
410    
411    }