001
014
015 package com.liferay.portal.liveusers.messaging;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
019 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
020 import com.liferay.portal.kernel.cluster.ClusterRequest;
021 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
022 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
023 import com.liferay.portal.kernel.json.JSONFactoryUtil;
024 import com.liferay.portal.kernel.json.JSONObject;
025 import com.liferay.portal.kernel.log.Log;
026 import com.liferay.portal.kernel.log.LogFactoryUtil;
027 import com.liferay.portal.kernel.messaging.BaseMessageListener;
028 import com.liferay.portal.kernel.messaging.Message;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.liveusers.LiveUsers;
032
033 import java.util.Map;
034 import java.util.Set;
035 import java.util.concurrent.Future;
036
037
041 public class LiveUsersMessageListener extends BaseMessageListener {
042
043 protected void doCommandAddClusterNode(JSONObject jsonObject)
044 throws Exception {
045
046 String clusterNodeId = jsonObject.getString("clusterNodeId");
047
048 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
049 _getLocalClusterUsersMethodHandler, clusterNodeId);
050
051 FutureClusterResponses futureClusterResponses =
052 ClusterExecutorUtil.execute(clusterRequest);
053
054 futureClusterResponses.addFutureListener(
055 new LiveUsersClusterResponseFutureListener(clusterNodeId));
056 }
057
058 protected void doCommandRemoveClusterNode(JSONObject jsonObject)
059 throws Exception {
060
061 String clusterNodeId = jsonObject.getString("clusterNodeId");
062
063 LiveUsers.removeClusterNode(clusterNodeId);
064 }
065
066 protected void doCommandSignIn(JSONObject jsonObject) throws Exception {
067 String clusterNodeId = jsonObject.getString("clusterNodeId");
068 long companyId = jsonObject.getLong("companyId");
069 long userId = jsonObject.getLong("userId");
070 String sessionId = jsonObject.getString("sessionId");
071 String remoteAddr = jsonObject.getString("remoteAddr");
072 String remoteHost = jsonObject.getString("remoteHost");
073 String userAgent = jsonObject.getString("userAgent");
074
075 LiveUsers.signIn(
076 clusterNodeId, companyId, userId, sessionId, remoteAddr, remoteHost,
077 userAgent);
078 }
079
080 protected void doCommandSignOut(JSONObject jsonObject) throws Exception {
081 String clusterNodeId = jsonObject.getString("clusterNodeId");
082 long companyId = jsonObject.getLong("companyId");
083 long userId = jsonObject.getLong("userId");
084 String sessionId = jsonObject.getString("sessionId");
085
086 LiveUsers.signOut(clusterNodeId, companyId, userId, sessionId);
087 }
088
089 @Override
090 protected void doReceive(Message message) throws Exception {
091 String payload = (String)message.getPayload();
092
093 JSONObject jsonObject = JSONFactoryUtil.createJSONObject(payload);
094
095 String command = jsonObject.getString("command");
096
097 if (command.equals("addClusterNode")) {
098 doCommandAddClusterNode(jsonObject);
099 }
100 else if (command.equals("removeClusterNode")) {
101 doCommandRemoveClusterNode(jsonObject);
102 }
103 else if (command.equals("signIn")) {
104 doCommandSignIn(jsonObject);
105 }
106 else if (command.equals("signOut")) {
107 doCommandSignOut(jsonObject);
108 }
109 }
110
111 private static final Log _log = LogFactoryUtil.getLog(
112 LiveUsersMessageListener.class);
113
114 private static final MethodHandler _getLocalClusterUsersMethodHandler =
115 new MethodHandler(
116 new MethodKey(LiveUsers.class, "getLocalClusterUsers"));
117
118 private class LiveUsersClusterResponseFutureListener
119 extends BaseFutureListener<ClusterNodeResponses> {
120
121 public LiveUsersClusterResponseFutureListener(String clusterNodeId) {
122 _clusterNodeId = clusterNodeId;
123 }
124
125 @Override
126 public void completeWithException(
127 Future<ClusterNodeResponses> future, Throwable throwable) {
128
129 _log.error(
130 "Uanble to add cluster node " + _clusterNodeId, throwable);
131 }
132
133 @Override
134 public void completeWithResult(
135 Future<ClusterNodeResponses> future,
136 ClusterNodeResponses clusterNodeResponses) {
137
138 ClusterNodeResponse clusterNodeResponse =
139 clusterNodeResponses.getClusterResponse(_clusterNodeId);
140
141 try {
142 Object result = clusterNodeResponse.getResult();
143
144 if (result == null) {
145 return;
146 }
147
148 Map<Long, Map<Long, Set<String>>> clusterUsers =
149 (Map<Long, Map<Long, Set<String>>>)result;
150
151 LiveUsers.addClusterNode(_clusterNodeId, clusterUsers);
152 }
153 catch (Exception e) {
154 _log.error("Unable to add cluster node " + _clusterNodeId, e);
155 }
156 }
157
158 private final String _clusterNodeId;
159
160 }
161
162 }