001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterRequest;
019 import com.liferay.portal.kernel.notifications.Channel;
020 import com.liferay.portal.kernel.notifications.ChannelException;
021 import com.liferay.portal.kernel.notifications.ChannelHub;
022 import com.liferay.portal.kernel.notifications.ChannelHubManager;
023 import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
024 import com.liferay.portal.kernel.notifications.ChannelListener;
025 import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
028 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031
032 import java.util.Collection;
033 import java.util.Collections;
034 import java.util.List;
035 import java.util.concurrent.ConcurrentHashMap;
036 import java.util.concurrent.ConcurrentMap;
037
038
043 @DoPrivileged
044 public class ChannelHubManagerImpl implements ChannelHubManager {
045
046 @Override
047 public void confirmDelivery(
048 long companyId, long userId,
049 Collection<String> notificationEventUuids)
050 throws ChannelException {
051
052 confirmDelivery(companyId, userId, notificationEventUuids, false);
053 }
054
055 @Override
056 public void confirmDelivery(
057 long companyId, long userId,
058 Collection<String> notificationEventUuids, boolean archive)
059 throws ChannelException {
060
061 ChannelHub channelHub = getChannelHub(companyId);
062
063 channelHub.confirmDelivery(userId, notificationEventUuids, archive);
064 }
065
066 @Override
067 public void confirmDelivery(
068 long companyId, long userId, String notificationEventUuid)
069 throws ChannelException {
070
071 confirmDelivery(companyId, userId, notificationEventUuid, false);
072 }
073
074 @Override
075 public void confirmDelivery(
076 long companyId, long userId, String notificationEventUuid,
077 boolean archive)
078 throws ChannelException {
079
080 ChannelHub channelHub = getChannelHub(companyId);
081
082 channelHub.confirmDelivery(userId, notificationEventUuid, archive);
083 }
084
085 @Override
086 public Channel createChannel(long companyId, long userId)
087 throws ChannelException {
088
089 ChannelHub channelHub = getChannelHub(companyId);
090
091 return channelHub.createChannel(userId);
092 }
093
094 @Override
095 public ChannelHub createChannelHub(long companyId) throws ChannelException {
096 ChannelHub channelHub = _channelHub.clone(companyId);
097
098 if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
099 throw new DuplicateChannelHubException(
100 "Channel already exists with company id " + companyId);
101 }
102
103 return channelHub;
104 }
105
106 @Override
107 public void deleteUserNotificiationEvent(
108 long companyId, long userId, String notificationEventUuid)
109 throws ChannelException {
110
111 ChannelHub channelHub = getChannelHub(companyId);
112
113 channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
114 }
115
116 @Override
117 public void deleteUserNotificiationEvents(
118 long companyId, long userId,
119 Collection<String> notificationEventUuids)
120 throws ChannelException {
121
122 ChannelHub channelHub = getChannelHub(companyId);
123
124 channelHub.deleteUserNotificiationEvents(
125 userId, notificationEventUuids);
126 }
127
128 @Override
129 public void destroyChannel(long companyId, long userId)
130 throws ChannelException {
131
132 ChannelHub channelHub = getChannelHub(companyId);
133
134 channelHub.destroyChannel(userId);
135 }
136
137 @Override
138 public void destroyChannelHub(long companyId) throws ChannelException {
139 ChannelHub channelHub = _channelHubs.remove(companyId);
140
141 if (channelHub != null) {
142 channelHub.destroy();
143 }
144 }
145
146 @Override
147 public void destroyClusterChannel(long companyId, long userId)
148 throws ChannelException {
149
150 destroyChannel(companyId, userId);
151
152 MethodHandler methodHandler = new MethodHandler(
153 _destroyChannelMethodKey, companyId, userId);
154
155 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
156 methodHandler, true);
157
158 try {
159 ClusterExecutorUtil.execute(clusterRequest);
160 }
161 catch (Exception e) {
162 throw new ChannelException(
163 "Unable to destroy channel across cluster", e);
164 }
165 }
166
167 @Override
168 public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
169 return fetchChannelHub(companyId, false);
170 }
171
172 @Override
173 public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
174 throws ChannelException {
175
176 ChannelHub channelHub = _channelHubs.get(companyId);
177
178 if (channelHub == null) {
179 synchronized(_channelHubs) {
180 channelHub = _channelHubs.get(companyId);
181
182 if (channelHub == null) {
183 if (createIfAbsent) {
184 channelHub = createChannelHub(companyId);
185 }
186 }
187 }
188 }
189
190 return channelHub;
191 }
192
193 @Override
194 public List<NotificationEvent> fetchNotificationEvents(
195 long companyId, long userId, boolean flush)
196 throws ChannelException {
197
198 ChannelHub channelHub = fetchChannelHub(companyId);
199
200 if (channelHub == null) {
201 return Collections.emptyList();
202 }
203
204 return channelHub.fetchNotificationEvents(userId, flush);
205 }
206
207 @Override
208 public void flush() throws ChannelException {
209 for (ChannelHub channelHub : _channelHubs.values()) {
210 channelHub.flush();
211 }
212 }
213
214 @Override
215 public void flush(long companyId) throws ChannelException {
216 ChannelHub channelHub = fetchChannelHub(companyId);
217
218 if (channelHub != null) {
219 channelHub.flush();
220 }
221 }
222
223 @Override
224 public void flush(long companyId, long userId, long timestamp)
225 throws ChannelException {
226
227 ChannelHub channelHub = fetchChannelHub(companyId);
228
229 if (channelHub != null) {
230 channelHub.flush(userId, timestamp);
231 }
232 }
233
234 @Override
235 public Channel getChannel(long companyId, long userId)
236 throws ChannelException {
237
238 return getChannel(companyId, userId, false);
239 }
240
241 @Override
242 public Channel getChannel(
243 long companyId, long userId, boolean createIfAbsent)
244 throws ChannelException {
245
246 ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
247
248 return channelHub.getChannel(userId, createIfAbsent);
249 }
250
251 @Override
252 public ChannelHub getChannelHub(long companyId) throws ChannelException {
253 return getChannelHub(companyId, false);
254 }
255
256 @Override
257 public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
258 throws ChannelException {
259
260 ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
261
262 if (channelHub == null) {
263 throw new UnknownChannelHubException(
264 "No channel exists with company id " + companyId);
265 }
266
267 return channelHub;
268 }
269
270 @Override
271 public List<NotificationEvent> getNotificationEvents(
272 long companyId, long userId)
273 throws ChannelException {
274
275 ChannelHub channelHub = getChannelHub(companyId);
276
277 return channelHub.getNotificationEvents(userId);
278 }
279
280 @Override
281 public List<NotificationEvent> getNotificationEvents(
282 long companyId, long userId, boolean flush)
283 throws ChannelException {
284
285 ChannelHub channelHub = getChannelHub(companyId);
286
287 return channelHub.getNotificationEvents(userId, flush);
288 }
289
290 @Override
291 public Collection<Long> getUserIds(long companyId) throws ChannelException {
292 ChannelHub channelHub = getChannelHub(companyId);
293
294 return channelHub.getUserIds();
295 }
296
297 @Override
298 public void registerChannelListener(
299 long companyId, long userId, ChannelListener channelListener)
300 throws ChannelException {
301
302 ChannelHub channelHub = getChannelHub(companyId);
303
304 channelHub.registerChannelListener(userId, channelListener);
305 }
306
307 @Override
308 public void removeTransientNotificationEvents(
309 long companyId, long userId,
310 Collection<NotificationEvent> notificationEvents)
311 throws ChannelException {
312
313 ChannelHub channelHub = getChannelHub(companyId);
314
315 channelHub.removeTransientNotificationEvents(
316 userId, notificationEvents);
317 }
318
319 @Override
320 public void removeTransientNotificationEventsByUuid(
321 long companyId, long userId,
322 Collection<String> notificationEventUuids)
323 throws ChannelException {
324
325 ChannelHub channelHub = getChannelHub(companyId);
326
327 channelHub.removeTransientNotificationEventsByUuid(
328 userId, notificationEventUuids);
329 }
330
331 @Override
332 public void sendClusterNotificationEvent(
333 long companyId, long userId, NotificationEvent notificationEvent)
334 throws ChannelException {
335
336 sendNotificationEvent(companyId, userId, notificationEvent);
337
338 MethodHandler methodHandler = new MethodHandler(
339 _storeNotificationEventMethodKey, companyId, userId,
340 notificationEvent);
341
342 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
343 methodHandler, true);
344
345 try {
346 ClusterExecutorUtil.execute(clusterRequest);
347 }
348 catch (Exception e) {
349 throw new ChannelException("Unable to notify cluster of event", e);
350 }
351 }
352
353 @Override
354 public void sendNotificationEvent(
355 long companyId, long userId, NotificationEvent notificationEvent)
356 throws ChannelException {
357
358 ChannelHub channelHub = getChannelHub(companyId);
359
360 channelHub.sendNotificationEvent(userId, notificationEvent);
361 }
362
363 @Override
364 public void sendNotificationEvents(
365 long companyId, long userId,
366 Collection<NotificationEvent> notificationEvents)
367 throws ChannelException {
368
369 ChannelHub channelHub = getChannelHub(companyId);
370
371 channelHub.sendNotificationEvents(userId, notificationEvents);
372 }
373
374 public void setChannelHubPrototype(ChannelHub channelHub) {
375 _channelHub = channelHub;
376 }
377
378 @Override
379 public void storeNotificationEvent(
380 long companyId, long userId, NotificationEvent notificationEvent)
381 throws ChannelException {
382
383 ChannelHub channelHub = getChannelHub(companyId);
384
385 channelHub.storeNotificationEvent(userId, notificationEvent);
386 }
387
388 @Override
389 public void unregisterChannelListener(
390 long companyId, long userId, ChannelListener channelListener)
391 throws ChannelException {
392
393 ChannelHub channelHub = getChannelHub(companyId);
394
395 channelHub.unregisterChannelListener(userId, channelListener);
396 }
397
398 private static final MethodKey _destroyChannelMethodKey =
399 new MethodKey(
400 ChannelHubManagerUtil.class, "destroyChannel", long.class,
401 long.class);
402 private static final MethodKey _storeNotificationEventMethodKey =
403 new MethodKey(
404 ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
405 long.class, NotificationEvent.class);
406
407 private ChannelHub _channelHub;
408 private final ConcurrentMap<Long, ChannelHub> _channelHubs =
409 new ConcurrentHashMap<Long, ChannelHub>();
410
411 }