001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019 import com.liferay.portal.kernel.cluster.ClusterRequest;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.notifications.Channel;
023 import com.liferay.portal.kernel.notifications.ChannelException;
024 import com.liferay.portal.kernel.notifications.ChannelHub;
025 import com.liferay.portal.kernel.notifications.ChannelHubManager;
026 import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
027 import com.liferay.portal.kernel.notifications.ChannelListener;
028 import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
029 import com.liferay.portal.kernel.notifications.NotificationEvent;
030 import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
031 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
032 import com.liferay.portal.kernel.util.MethodHandler;
033 import com.liferay.portal.kernel.util.MethodKey;
034
035 import java.util.Collection;
036 import java.util.Collections;
037 import java.util.List;
038 import java.util.concurrent.ConcurrentHashMap;
039 import java.util.concurrent.ConcurrentMap;
040
041
046 @DoPrivileged
047 public class ChannelHubManagerImpl implements ChannelHubManager {
048
049 @Override
050 public void confirmDelivery(
051 long companyId, long userId,
052 Collection<String> notificationEventUuids)
053 throws ChannelException {
054
055 confirmDelivery(companyId, userId, notificationEventUuids, false);
056 }
057
058 @Override
059 public void confirmDelivery(
060 long companyId, long userId,
061 Collection<String> notificationEventUuids, boolean archive)
062 throws ChannelException {
063
064 ChannelHub channelHub = getChannelHub(companyId);
065
066 channelHub.confirmDelivery(userId, notificationEventUuids, archive);
067 }
068
069 @Override
070 public void confirmDelivery(
071 long companyId, long userId, String notificationEventUuid)
072 throws ChannelException {
073
074 confirmDelivery(companyId, userId, notificationEventUuid, false);
075 }
076
077 @Override
078 public void confirmDelivery(
079 long companyId, long userId, String notificationEventUuid,
080 boolean archive)
081 throws ChannelException {
082
083 ChannelHub channelHub = getChannelHub(companyId);
084
085 channelHub.confirmDelivery(userId, notificationEventUuid, archive);
086 }
087
088 @Override
089 public Channel createChannel(long companyId, long userId)
090 throws ChannelException {
091
092 ChannelHub channelHub = getChannelHub(companyId);
093
094 return channelHub.createChannel(userId);
095 }
096
097 @Override
098 public ChannelHub createChannelHub(long companyId) throws ChannelException {
099 ChannelHub channelHub = _channelHub.clone(companyId);
100
101 if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
102 throw new DuplicateChannelHubException(
103 "Channel already exists with company id " + companyId);
104 }
105
106 return channelHub;
107 }
108
109 @Override
110 public void deleteUserNotificiationEvent(
111 long companyId, long userId, String notificationEventUuid)
112 throws ChannelException {
113
114 ChannelHub channelHub = getChannelHub(companyId);
115
116 channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
117 }
118
119 @Override
120 public void deleteUserNotificiationEvents(
121 long companyId, long userId,
122 Collection<String> notificationEventUuids)
123 throws ChannelException {
124
125 ChannelHub channelHub = getChannelHub(companyId);
126
127 channelHub.deleteUserNotificiationEvents(
128 userId, notificationEventUuids);
129 }
130
131 @Override
132 public void destroyChannel(long companyId, long userId)
133 throws ChannelException {
134
135 ChannelHub channelHub = fetchChannelHub(companyId);
136
137 if (channelHub != null) {
138 channelHub.destroyChannel(userId);
139 }
140
141 if (!ClusterInvokeThreadLocal.isEnabled()) {
142 return;
143 }
144
145 MethodHandler methodHandler = new MethodHandler(
146 _destroyChannelMethodKey, companyId, userId);
147
148 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
149 methodHandler, true);
150
151 try {
152 ClusterExecutorUtil.execute(clusterRequest);
153 }
154 catch (Exception e) {
155 throw new ChannelException(
156 "Unable to destroy channel across cluster", e);
157 }
158 }
159
160 @Override
161 public void destroyChannelHub(long companyId) throws ChannelException {
162 ChannelHub channelHub = _channelHubs.remove(companyId);
163
164 if (channelHub != null) {
165 channelHub.destroy();
166 }
167 }
168
169 @Override
170 public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
171 return fetchChannelHub(companyId, false);
172 }
173
174 @Override
175 public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
176 throws ChannelException {
177
178 ChannelHub channelHub = _channelHubs.get(companyId);
179
180 if (channelHub == null) {
181 synchronized(_channelHubs) {
182 channelHub = _channelHubs.get(companyId);
183
184 if (channelHub == null) {
185 if (createIfAbsent) {
186 channelHub = createChannelHub(companyId);
187 }
188 }
189 }
190 }
191
192 return channelHub;
193 }
194
195 @Override
196 public List<NotificationEvent> fetchNotificationEvents(
197 long companyId, long userId, boolean flush)
198 throws ChannelException {
199
200 ChannelHub channelHub = fetchChannelHub(companyId);
201
202 if (channelHub == null) {
203 return Collections.emptyList();
204 }
205
206 return channelHub.fetchNotificationEvents(userId, flush);
207 }
208
209 @Override
210 public void flush() throws ChannelException {
211 for (ChannelHub channelHub : _channelHubs.values()) {
212 channelHub.flush();
213 }
214 }
215
216 @Override
217 public void flush(long companyId) throws ChannelException {
218 ChannelHub channelHub = fetchChannelHub(companyId);
219
220 if (channelHub != null) {
221 channelHub.flush();
222 }
223 }
224
225 @Override
226 public void flush(long companyId, long userId, long timestamp)
227 throws ChannelException {
228
229 ChannelHub channelHub = fetchChannelHub(companyId);
230
231 if (channelHub != null) {
232 channelHub.flush(userId, timestamp);
233 }
234 }
235
236 @Override
237 public Channel getChannel(long companyId, long userId)
238 throws ChannelException {
239
240 return getChannel(companyId, userId, false);
241 }
242
243 @Override
244 public Channel getChannel(
245 long companyId, long userId, boolean createIfAbsent)
246 throws ChannelException {
247
248 ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
249
250 return channelHub.getChannel(userId, createIfAbsent);
251 }
252
253 @Override
254 public ChannelHub getChannelHub(long companyId) throws ChannelException {
255 return getChannelHub(companyId, false);
256 }
257
258 @Override
259 public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
260 throws ChannelException {
261
262 ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
263
264 if (channelHub == null) {
265 throw new UnknownChannelHubException(
266 "No channel exists with company id " + companyId);
267 }
268
269 return channelHub;
270 }
271
272 @Override
273 public List<NotificationEvent> getNotificationEvents(
274 long companyId, long userId)
275 throws ChannelException {
276
277 ChannelHub channelHub = getChannelHub(companyId);
278
279 return channelHub.getNotificationEvents(userId);
280 }
281
282 @Override
283 public List<NotificationEvent> getNotificationEvents(
284 long companyId, long userId, boolean flush)
285 throws ChannelException {
286
287 ChannelHub channelHub = getChannelHub(companyId);
288
289 return channelHub.getNotificationEvents(userId, flush);
290 }
291
292 @Override
293 public Collection<Long> getUserIds(long companyId) throws ChannelException {
294 ChannelHub channelHub = getChannelHub(companyId);
295
296 return channelHub.getUserIds();
297 }
298
299 @Override
300 public void registerChannelListener(
301 long companyId, long userId, ChannelListener channelListener)
302 throws ChannelException {
303
304 ChannelHub channelHub = getChannelHub(companyId);
305
306 channelHub.registerChannelListener(userId, channelListener);
307 }
308
309 @Override
310 public void removeTransientNotificationEvents(
311 long companyId, long userId,
312 Collection<NotificationEvent> notificationEvents)
313 throws ChannelException {
314
315 ChannelHub channelHub = getChannelHub(companyId);
316
317 channelHub.removeTransientNotificationEvents(
318 userId, notificationEvents);
319 }
320
321 @Override
322 public void removeTransientNotificationEventsByUuid(
323 long companyId, long userId,
324 Collection<String> notificationEventUuids)
325 throws ChannelException {
326
327 ChannelHub channelHub = getChannelHub(companyId);
328
329 channelHub.removeTransientNotificationEventsByUuid(
330 userId, notificationEventUuids);
331 }
332
333 @Override
334 public void sendNotificationEvent(
335 long companyId, long userId, NotificationEvent notificationEvent)
336 throws ChannelException {
337
338 ChannelHub channelHub = fetchChannelHub(companyId);
339
340 if (channelHub != null) {
341 channelHub.sendNotificationEvent(userId, notificationEvent);
342 }
343
344 if (!ClusterInvokeThreadLocal.isEnabled()) {
345 return;
346 }
347
348 MethodHandler methodHandler = new MethodHandler(
349 _storeNotificationEventMethodKey, companyId, userId,
350 notificationEvent);
351
352 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
353 methodHandler, true);
354
355 try {
356 ClusterExecutorUtil.execute(clusterRequest);
357 }
358 catch (Exception e) {
359 throw new ChannelException("Unable to notify cluster of event", e);
360 }
361 }
362
363 @Override
364 public void sendNotificationEvents(
365 long companyId, long userId,
366 Collection<NotificationEvent> notificationEvents)
367 throws ChannelException {
368
369 ChannelHub channelHub = getChannelHub(companyId);
370
371 channelHub.sendNotificationEvents(userId, notificationEvents);
372 }
373
374 public void setChannelHubPrototype(ChannelHub channelHub) {
375 _channelHub = channelHub;
376 }
377
378 @Override
379 public void storeNotificationEvent(
380 long companyId, long userId, NotificationEvent notificationEvent)
381 throws ChannelException {
382
383 ChannelHub channelHub = fetchChannelHub(companyId);
384
385 if (channelHub != null) {
386 channelHub.storeNotificationEvent(userId, notificationEvent);
387 }
388 else if (_log.isDebugEnabled()) {
389 _log.debug("No channel hub exists for company " + companyId);
390 }
391 }
392
393 @Override
394 public void unregisterChannelListener(
395 long companyId, long userId, ChannelListener channelListener)
396 throws ChannelException {
397
398 ChannelHub channelHub = getChannelHub(companyId);
399
400 channelHub.unregisterChannelListener(userId, channelListener);
401 }
402
403 private static Log _log = LogFactoryUtil.getLog(
404 ChannelHubManagerImpl.class);
405
406 private static final MethodKey _destroyChannelMethodKey =
407 new MethodKey(
408 ChannelHubManagerUtil.class, "destroyChannel", long.class,
409 long.class);
410
411 private static final MethodKey _storeNotificationEventMethodKey =
412 new MethodKey(
413 ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
414 long.class, NotificationEvent.class);
415
416 private ChannelHub _channelHub;
417 private ConcurrentMap<Long, ChannelHub> _channelHubs =
418 new ConcurrentHashMap<Long, ChannelHub>();
419
420 }