001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019 import com.liferay.portal.kernel.cluster.ClusterRequest;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.notifications.Channel;
023 import com.liferay.portal.kernel.notifications.ChannelException;
024 import com.liferay.portal.kernel.notifications.ChannelHub;
025 import com.liferay.portal.kernel.notifications.ChannelHubManager;
026 import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
027 import com.liferay.portal.kernel.notifications.ChannelListener;
028 import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
029 import com.liferay.portal.kernel.notifications.NotificationEvent;
030 import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
031 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
032 import com.liferay.portal.kernel.util.MethodHandler;
033 import com.liferay.portal.kernel.util.MethodKey;
034
035 import java.util.Collection;
036 import java.util.Collections;
037 import java.util.List;
038 import java.util.concurrent.ConcurrentHashMap;
039 import java.util.concurrent.ConcurrentMap;
040
041
046 @DoPrivileged
047 public class ChannelHubManagerImpl implements ChannelHubManager {
048
049 @Override
050 public void confirmDelivery(
051 long companyId, long userId,
052 Collection<String> notificationEventUuids)
053 throws ChannelException {
054
055 confirmDelivery(companyId, userId, notificationEventUuids, false);
056 }
057
058 @Override
059 public void confirmDelivery(
060 long companyId, long userId,
061 Collection<String> notificationEventUuids, boolean archive)
062 throws ChannelException {
063
064 ChannelHub channelHub = getChannelHub(companyId);
065
066 channelHub.confirmDelivery(userId, notificationEventUuids, archive);
067 }
068
069 @Override
070 public void confirmDelivery(
071 long companyId, long userId, String notificationEventUuid)
072 throws ChannelException {
073
074 confirmDelivery(companyId, userId, notificationEventUuid, false);
075 }
076
077 @Override
078 public void confirmDelivery(
079 long companyId, long userId, String notificationEventUuid,
080 boolean archive)
081 throws ChannelException {
082
083 ChannelHub channelHub = getChannelHub(companyId);
084
085 channelHub.confirmDelivery(userId, notificationEventUuid, archive);
086 }
087
088 @Override
089 public Channel createChannel(long companyId, long userId)
090 throws ChannelException {
091
092 ChannelHub channelHub = getChannelHub(companyId);
093
094 return channelHub.createChannel(userId);
095 }
096
097 @Override
098 public ChannelHub createChannelHub(long companyId) throws ChannelException {
099 ChannelHub channelHub = new ChannelHubImpl(companyId);
100
101 if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
102 throw new DuplicateChannelHubException(
103 "Channel already exists with company id " + companyId);
104 }
105
106 return channelHub;
107 }
108
109 @Override
110 public void deleteUserNotificiationEvent(
111 long companyId, long userId, String notificationEventUuid)
112 throws ChannelException {
113
114 ChannelHub channelHub = getChannelHub(companyId);
115
116 channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
117 }
118
119 @Override
120 public void deleteUserNotificiationEvents(
121 long companyId, long userId,
122 Collection<String> notificationEventUuids)
123 throws ChannelException {
124
125 ChannelHub channelHub = getChannelHub(companyId);
126
127 channelHub.deleteUserNotificiationEvents(
128 userId, notificationEventUuids);
129 }
130
131 @Override
132 public void destroyChannel(long companyId, long userId)
133 throws ChannelException {
134
135 ChannelHub channelHub = getChannelHub(companyId);
136
137 channelHub.destroyChannel(userId);
138
139 if (!ClusterInvokeThreadLocal.isEnabled()) {
140 return;
141 }
142
143 MethodHandler methodHandler = new MethodHandler(
144 _destroyChannelMethodKey, companyId, userId);
145
146 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
147 methodHandler, true);
148
149 try {
150 ClusterExecutorUtil.execute(clusterRequest);
151 }
152 catch (Exception e) {
153 throw new ChannelException(
154 "Unable to destroy channel across cluster", e);
155 }
156 }
157
158 @Override
159 public void destroyChannelHub(long companyId) throws ChannelException {
160 ChannelHub channelHub = _channelHubs.remove(companyId);
161
162 if (channelHub != null) {
163 channelHub.destroy();
164 }
165 }
166
167 @Override
168 public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
169 return fetchChannelHub(companyId, false);
170 }
171
172 @Override
173 public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
174 throws ChannelException {
175
176 ChannelHub channelHub = _channelHubs.get(companyId);
177
178 if (channelHub == null) {
179 synchronized(_channelHubs) {
180 channelHub = _channelHubs.get(companyId);
181
182 if (channelHub == null) {
183 if (createIfAbsent) {
184 channelHub = createChannelHub(companyId);
185 }
186 }
187 }
188 }
189
190 return channelHub;
191 }
192
193 @Override
194 public List<NotificationEvent> fetchNotificationEvents(
195 long companyId, long userId, boolean flush)
196 throws ChannelException {
197
198 ChannelHub channelHub = fetchChannelHub(companyId);
199
200 if (channelHub == null) {
201 return Collections.emptyList();
202 }
203
204 return channelHub.fetchNotificationEvents(userId, flush);
205 }
206
207 @Override
208 public void flush() throws ChannelException {
209 for (ChannelHub channelHub : _channelHubs.values()) {
210 channelHub.flush();
211 }
212 }
213
214 @Override
215 public void flush(long companyId) throws ChannelException {
216 ChannelHub channelHub = fetchChannelHub(companyId);
217
218 if (channelHub != null) {
219 channelHub.flush();
220 }
221 }
222
223 @Override
224 public void flush(long companyId, long userId, long timestamp)
225 throws ChannelException {
226
227 ChannelHub channelHub = fetchChannelHub(companyId);
228
229 if (channelHub != null) {
230 channelHub.flush(userId, timestamp);
231 }
232 }
233
234 @Override
235 public Channel getChannel(long companyId, long userId)
236 throws ChannelException {
237
238 return getChannel(companyId, userId, false);
239 }
240
241 @Override
242 public Channel getChannel(
243 long companyId, long userId, boolean createIfAbsent)
244 throws ChannelException {
245
246 ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
247
248 return channelHub.getChannel(userId, createIfAbsent);
249 }
250
251 @Override
252 public ChannelHub getChannelHub(long companyId) throws ChannelException {
253 return getChannelHub(companyId, false);
254 }
255
256 @Override
257 public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
258 throws ChannelException {
259
260 ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
261
262 if (channelHub == null) {
263 throw new UnknownChannelHubException(
264 "No channel exists with company id " + companyId);
265 }
266
267 return channelHub;
268 }
269
270 @Override
271 public List<NotificationEvent> getNotificationEvents(
272 long companyId, long userId)
273 throws ChannelException {
274
275 ChannelHub channelHub = getChannelHub(companyId);
276
277 return channelHub.getNotificationEvents(userId);
278 }
279
280 @Override
281 public List<NotificationEvent> getNotificationEvents(
282 long companyId, long userId, boolean flush)
283 throws ChannelException {
284
285 ChannelHub channelHub = getChannelHub(companyId);
286
287 return channelHub.getNotificationEvents(userId, flush);
288 }
289
290 @Override
291 public Collection<Long> getUserIds(long companyId) throws ChannelException {
292 ChannelHub channelHub = getChannelHub(companyId);
293
294 return channelHub.getUserIds();
295 }
296
297 @Override
298 public void registerChannelListener(
299 long companyId, long userId, ChannelListener channelListener)
300 throws ChannelException {
301
302 ChannelHub channelHub = getChannelHub(companyId);
303
304 channelHub.registerChannelListener(userId, channelListener);
305 }
306
307 @Override
308 public void removeTransientNotificationEvents(
309 long companyId, long userId,
310 Collection<NotificationEvent> notificationEvents)
311 throws ChannelException {
312
313 ChannelHub channelHub = getChannelHub(companyId);
314
315 channelHub.removeTransientNotificationEvents(
316 userId, notificationEvents);
317 }
318
319 @Override
320 public void removeTransientNotificationEventsByUuid(
321 long companyId, long userId,
322 Collection<String> notificationEventUuids)
323 throws ChannelException {
324
325 ChannelHub channelHub = getChannelHub(companyId);
326
327 channelHub.removeTransientNotificationEventsByUuid(
328 userId, notificationEventUuids);
329 }
330
331 @Override
332 public void sendNotificationEvent(
333 long companyId, long userId, NotificationEvent notificationEvent)
334 throws ChannelException {
335
336 ChannelHub channelHub = getChannelHub(companyId);
337
338 channelHub.sendNotificationEvent(userId, notificationEvent);
339
340 if (!ClusterInvokeThreadLocal.isEnabled()) {
341 return;
342 }
343
344 MethodHandler methodHandler = new MethodHandler(
345 _storeNotificationEventMethodKey, companyId, userId,
346 notificationEvent);
347
348 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
349 methodHandler, true);
350
351 try {
352 ClusterExecutorUtil.execute(clusterRequest);
353 }
354 catch (Exception e) {
355 throw new ChannelException("Unable to notify cluster of event", e);
356 }
357 }
358
359 @Override
360 public void sendNotificationEvents(
361 long companyId, long userId,
362 Collection<NotificationEvent> notificationEvents)
363 throws ChannelException {
364
365 ChannelHub channelHub = getChannelHub(companyId);
366
367 channelHub.sendNotificationEvents(userId, notificationEvents);
368 }
369
370 @Override
371 public void storeNotificationEvent(
372 long companyId, long userId, NotificationEvent notificationEvent)
373 throws ChannelException {
374
375 ChannelHub channelHub = fetchChannelHub(companyId);
376
377 if (channelHub != null) {
378 channelHub.storeNotificationEvent(userId, notificationEvent);
379 }
380 else if (_log.isDebugEnabled()) {
381 _log.debug("No channel hub exists for company " + companyId);
382 }
383 }
384
385 @Override
386 public void unregisterChannelListener(
387 long companyId, long userId, ChannelListener channelListener)
388 throws ChannelException {
389
390 ChannelHub channelHub = getChannelHub(companyId);
391
392 channelHub.unregisterChannelListener(userId, channelListener);
393 }
394
395 private static final Log _log = LogFactoryUtil.getLog(
396 ChannelHubManagerImpl.class);
397
398 private static final MethodKey _destroyChannelMethodKey = new MethodKey(
399 ChannelHubManagerUtil.class, "destroyChannel", long.class, long.class);
400 private static final MethodKey _storeNotificationEventMethodKey =
401 new MethodKey(
402 ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
403 long.class, NotificationEvent.class);
404
405 private final ConcurrentMap<Long, ChannelHub> _channelHubs =
406 new ConcurrentHashMap<>();
407
408 }