001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019    import com.liferay.portal.kernel.cluster.ClusterRequest;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.notifications.Channel;
023    import com.liferay.portal.kernel.notifications.ChannelException;
024    import com.liferay.portal.kernel.notifications.ChannelHub;
025    import com.liferay.portal.kernel.notifications.ChannelHubManager;
026    import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
027    import com.liferay.portal.kernel.notifications.ChannelListener;
028    import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
029    import com.liferay.portal.kernel.notifications.NotificationEvent;
030    import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
031    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
032    import com.liferay.portal.kernel.util.MethodHandler;
033    import com.liferay.portal.kernel.util.MethodKey;
034    
035    import java.util.Collection;
036    import java.util.Collections;
037    import java.util.List;
038    import java.util.concurrent.ConcurrentHashMap;
039    import java.util.concurrent.ConcurrentMap;
040    
041    /**
042     * @author Edward Han
043     * @author Brian Wing Shun
044     * @author Shuyang Zhou
045     */
046    @DoPrivileged
047    public class ChannelHubManagerImpl implements ChannelHubManager {
048    
049            @Override
050            public void confirmDelivery(
051                            long companyId, long userId,
052                            Collection<String> notificationEventUuids)
053                    throws ChannelException {
054    
055                    confirmDelivery(companyId, userId, notificationEventUuids, false);
056            }
057    
058            @Override
059            public void confirmDelivery(
060                            long companyId, long userId,
061                            Collection<String> notificationEventUuids, boolean archive)
062                    throws ChannelException {
063    
064                    ChannelHub channelHub = getChannelHub(companyId);
065    
066                    channelHub.confirmDelivery(userId, notificationEventUuids, archive);
067            }
068    
069            @Override
070            public void confirmDelivery(
071                            long companyId, long userId, String notificationEventUuid)
072                    throws ChannelException {
073    
074                    confirmDelivery(companyId, userId, notificationEventUuid, false);
075            }
076    
077            @Override
078            public void confirmDelivery(
079                            long companyId, long userId, String notificationEventUuid,
080                            boolean archive)
081                    throws ChannelException {
082    
083                    ChannelHub channelHub = getChannelHub(companyId);
084    
085                    channelHub.confirmDelivery(userId, notificationEventUuid, archive);
086            }
087    
088            @Override
089            public Channel createChannel(long companyId, long userId)
090                    throws ChannelException {
091    
092                    ChannelHub channelHub = getChannelHub(companyId);
093    
094                    return channelHub.createChannel(userId);
095            }
096    
097            @Override
098            public ChannelHub createChannelHub(long companyId) throws ChannelException {
099                    ChannelHub channelHub = _channelHub.clone(companyId);
100    
101                    if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
102                            throw new DuplicateChannelHubException(
103                                    "Channel already exists with company id " + companyId);
104                    }
105    
106                    return channelHub;
107            }
108    
109            @Override
110            public void deleteUserNotificiationEvent(
111                            long companyId, long userId, String notificationEventUuid)
112                    throws ChannelException {
113    
114                    ChannelHub channelHub = getChannelHub(companyId);
115    
116                    channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
117            }
118    
119            @Override
120            public void deleteUserNotificiationEvents(
121                            long companyId, long userId,
122                            Collection<String> notificationEventUuids)
123                    throws ChannelException {
124    
125                    ChannelHub channelHub = getChannelHub(companyId);
126    
127                    channelHub.deleteUserNotificiationEvents(
128                            userId, notificationEventUuids);
129            }
130    
131            @Override
132            public void destroyChannel(long companyId, long userId)
133                    throws ChannelException {
134    
135                    ChannelHub channelHub = fetchChannelHub(companyId);
136    
137                    if (channelHub != null) {
138                            channelHub.destroyChannel(userId);
139                    }
140    
141                    if (!ClusterInvokeThreadLocal.isEnabled()) {
142                            return;
143                    }
144    
145                    MethodHandler methodHandler = new MethodHandler(
146                            _destroyChannelMethodKey, companyId, userId);
147    
148                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
149                            methodHandler, true);
150    
151                    try {
152                            ClusterExecutorUtil.execute(clusterRequest);
153                    }
154                    catch (Exception e) {
155                            throw new ChannelException(
156                                    "Unable to destroy channel across cluster", e);
157                    }
158            }
159    
160            @Override
161            public void destroyChannelHub(long companyId) throws ChannelException {
162                    ChannelHub channelHub = _channelHubs.remove(companyId);
163    
164                    if (channelHub != null) {
165                            channelHub.destroy();
166                    }
167            }
168    
169            @Override
170            public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
171                    return fetchChannelHub(companyId, false);
172            }
173    
174            @Override
175            public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
176                    throws ChannelException {
177    
178                    ChannelHub channelHub = _channelHubs.get(companyId);
179    
180                    if (channelHub == null) {
181                            synchronized(_channelHubs) {
182                                    channelHub = _channelHubs.get(companyId);
183    
184                                    if (channelHub == null) {
185                                            if (createIfAbsent) {
186                                                    channelHub = createChannelHub(companyId);
187                                            }
188                                    }
189                            }
190                    }
191    
192                    return channelHub;
193            }
194    
195            @Override
196            public List<NotificationEvent> fetchNotificationEvents(
197                            long companyId, long userId, boolean flush)
198                    throws ChannelException {
199    
200                    ChannelHub channelHub = fetchChannelHub(companyId);
201    
202                    if (channelHub == null) {
203                            return Collections.emptyList();
204                    }
205    
206                    return channelHub.fetchNotificationEvents(userId, flush);
207            }
208    
209            @Override
210            public void flush() throws ChannelException {
211                    for (ChannelHub channelHub : _channelHubs.values()) {
212                            channelHub.flush();
213                    }
214            }
215    
216            @Override
217            public void flush(long companyId) throws ChannelException {
218                    ChannelHub channelHub = fetchChannelHub(companyId);
219    
220                    if (channelHub != null) {
221                            channelHub.flush();
222                    }
223            }
224    
225            @Override
226            public void flush(long companyId, long userId, long timestamp)
227                    throws ChannelException {
228    
229                    ChannelHub channelHub = fetchChannelHub(companyId);
230    
231                    if (channelHub != null) {
232                            channelHub.flush(userId, timestamp);
233                    }
234            }
235    
236            @Override
237            public Channel getChannel(long companyId, long userId)
238                    throws ChannelException {
239    
240                    return getChannel(companyId, userId, false);
241            }
242    
243            @Override
244            public Channel getChannel(
245                            long companyId, long userId, boolean createIfAbsent)
246                    throws ChannelException {
247    
248                    ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
249    
250                    return channelHub.getChannel(userId, createIfAbsent);
251            }
252    
253            @Override
254            public ChannelHub getChannelHub(long companyId) throws ChannelException {
255                    return getChannelHub(companyId, false);
256            }
257    
258            @Override
259            public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
260                    throws ChannelException {
261    
262                    ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
263    
264                    if (channelHub == null) {
265                            throw new UnknownChannelHubException(
266                                    "No channel exists with company id " + companyId);
267                    }
268    
269                    return channelHub;
270            }
271    
272            @Override
273            public List<NotificationEvent> getNotificationEvents(
274                            long companyId, long userId)
275                    throws ChannelException {
276    
277                    ChannelHub channelHub = getChannelHub(companyId);
278    
279                    return channelHub.getNotificationEvents(userId);
280            }
281    
282            @Override
283            public List<NotificationEvent> getNotificationEvents(
284                            long companyId, long userId, boolean flush)
285                    throws ChannelException {
286    
287                    ChannelHub channelHub = getChannelHub(companyId);
288    
289                    return channelHub.getNotificationEvents(userId, flush);
290            }
291    
292            @Override
293            public Collection<Long> getUserIds(long companyId) throws ChannelException {
294                    ChannelHub channelHub = getChannelHub(companyId);
295    
296                    return channelHub.getUserIds();
297            }
298    
299            @Override
300            public void registerChannelListener(
301                            long companyId, long userId, ChannelListener channelListener)
302                    throws ChannelException {
303    
304                    ChannelHub channelHub = getChannelHub(companyId);
305    
306                    channelHub.registerChannelListener(userId, channelListener);
307            }
308    
309            @Override
310            public void removeTransientNotificationEvents(
311                            long companyId, long userId,
312                            Collection<NotificationEvent> notificationEvents)
313                    throws ChannelException {
314    
315                    ChannelHub channelHub = getChannelHub(companyId);
316    
317                    channelHub.removeTransientNotificationEvents(
318                            userId, notificationEvents);
319            }
320    
321            @Override
322            public void removeTransientNotificationEventsByUuid(
323                            long companyId, long userId,
324                            Collection<String> notificationEventUuids)
325                    throws ChannelException {
326    
327                    ChannelHub channelHub = getChannelHub(companyId);
328    
329                    channelHub.removeTransientNotificationEventsByUuid(
330                            userId, notificationEventUuids);
331            }
332    
333            @Override
334            public void sendNotificationEvent(
335                            long companyId, long userId, NotificationEvent notificationEvent)
336                    throws ChannelException {
337    
338                    ChannelHub channelHub = fetchChannelHub(companyId);
339    
340                    if (channelHub != null) {
341                            channelHub.sendNotificationEvent(userId, notificationEvent);
342                    }
343    
344                    if (!ClusterInvokeThreadLocal.isEnabled()) {
345                            return;
346                    }
347    
348                    MethodHandler methodHandler = new MethodHandler(
349                            _storeNotificationEventMethodKey, companyId, userId,
350                            notificationEvent);
351    
352                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
353                            methodHandler, true);
354    
355                    try {
356                            ClusterExecutorUtil.execute(clusterRequest);
357                    }
358                    catch (Exception e) {
359                            throw new ChannelException("Unable to notify cluster of event", e);
360                    }
361            }
362    
363            @Override
364            public void sendNotificationEvents(
365                            long companyId, long userId,
366                            Collection<NotificationEvent> notificationEvents)
367                    throws ChannelException {
368    
369                    ChannelHub channelHub = getChannelHub(companyId);
370    
371                    channelHub.sendNotificationEvents(userId, notificationEvents);
372            }
373    
374            public void setChannelHubPrototype(ChannelHub channelHub) {
375                    _channelHub = channelHub;
376            }
377    
378            @Override
379            public void storeNotificationEvent(
380                            long companyId, long userId, NotificationEvent notificationEvent)
381                    throws ChannelException {
382    
383                    ChannelHub channelHub = fetchChannelHub(companyId);
384    
385                    if (channelHub != null) {
386                            channelHub.storeNotificationEvent(userId, notificationEvent);
387                    }
388                    else if (_log.isDebugEnabled()) {
389                            _log.debug("No channel hub exists for company " + companyId);
390                    }
391            }
392    
393            @Override
394            public void unregisterChannelListener(
395                            long companyId, long userId, ChannelListener channelListener)
396                    throws ChannelException {
397    
398                    ChannelHub channelHub = getChannelHub(companyId);
399    
400                    channelHub.unregisterChannelListener(userId, channelListener);
401            }
402    
403            private static Log _log = LogFactoryUtil.getLog(
404                    ChannelHubManagerImpl.class);
405    
406            private static final MethodKey _destroyChannelMethodKey =
407                            new MethodKey(
408                                    ChannelHubManagerUtil.class, "destroyChannel", long.class,
409                                    long.class);
410    
411            private static final MethodKey _storeNotificationEventMethodKey =
412                    new MethodKey(
413                            ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
414                            long.class, NotificationEvent.class);
415    
416            private ChannelHub _channelHub;
417            private ConcurrentMap<Long, ChannelHub> _channelHubs =
418                    new ConcurrentHashMap<Long, ChannelHub>();
419    
420    }