001
014
015 package com.liferay.portal.fabric.netty.repository;
016
017 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018 import com.liferay.portal.fabric.netty.fileserver.FileRequest;
019 import com.liferay.portal.fabric.netty.fileserver.FileResponse;
020 import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
021 import com.liferay.portal.fabric.netty.util.NettyUtil;
022 import com.liferay.portal.fabric.repository.Repository;
023 import com.liferay.portal.fabric.repository.RepositoryHelperUtil;
024 import com.liferay.portal.kernel.concurrent.AsyncBroker;
025 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
026 import com.liferay.portal.kernel.concurrent.FutureListener;
027 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
028 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
029 import com.liferay.portal.kernel.log.Log;
030 import com.liferay.portal.kernel.log.LogFactoryUtil;
031
032 import io.netty.channel.Channel;
033 import io.netty.channel.ChannelFuture;
034 import io.netty.channel.ChannelFutureListener;
035 import io.netty.channel.ChannelPipeline;
036 import io.netty.util.concurrent.EventExecutorGroup;
037
038 import java.io.IOException;
039
040 import java.nio.file.Files;
041 import java.nio.file.Path;
042 import java.nio.file.attribute.FileTime;
043
044 import java.util.Map;
045 import java.util.concurrent.ConcurrentHashMap;
046 import java.util.concurrent.ExecutionException;
047 import java.util.concurrent.Future;
048 import java.util.concurrent.atomic.AtomicInteger;
049
050
053 public class NettyRepository implements Repository {
054
055 public NettyRepository(
056 Path repositoryPath, Channel channel,
057 EventExecutorGroup eventExecutorGroup, long getFileTimeout) {
058
059 if (repositoryPath == null) {
060 throw new NullPointerException("Repository path is null");
061 }
062
063 if (channel == null) {
064 throw new NullPointerException("Channel is null");
065 }
066
067 if (eventExecutorGroup == null) {
068 throw new NullPointerException("Event executor group is null");
069 }
070
071 if (!Files.isDirectory(repositoryPath)) {
072 throw new IllegalArgumentException(
073 repositoryPath + " is not a directory");
074 }
075
076 this.repositoryPath = repositoryPath;
077 this.channel = channel;
078 this.getFileTimeout = getFileTimeout;
079
080 ChannelPipeline channelPipeline = channel.pipeline();
081
082 channelPipeline.addLast(
083 new FileResponseChannelHandler(asyncBroker, eventExecutorGroup));
084 }
085
086 @Override
087 public void dispose(boolean delete) {
088 for (Path path : pathMap.values()) {
089 FileHelperUtil.delete(true, path);
090 }
091
092 pathMap.clear();
093
094 if (delete) {
095 FileHelperUtil.delete(true, repositoryPath);
096 }
097 }
098
099 @Override
100 public NoticeableFuture<Path> getFile(
101 Path remoteFilePath, Path localFilePath, boolean deleteAfterFetch) {
102
103 if (localFilePath == null) {
104 return getFile(
105 remoteFilePath,
106 RepositoryHelperUtil.getRepositoryFilePath(
107 repositoryPath, remoteFilePath),
108 deleteAfterFetch, true);
109 }
110
111 return getFile(remoteFilePath, localFilePath, deleteAfterFetch, false);
112 }
113
114 @Override
115 public NoticeableFuture<Map<Path, Path>> getFiles(
116 Map<Path, Path> pathMap, boolean deleteAfterFetch) {
117
118 final DefaultNoticeableFuture<Map<Path, Path>> defaultNoticeableFuture =
119 new DefaultNoticeableFuture<Map<Path, Path>>();
120
121 if (pathMap.isEmpty()) {
122 defaultNoticeableFuture.set(pathMap);
123
124 return defaultNoticeableFuture;
125 }
126
127 final Map<Path, Path> resultPathMap =
128 new ConcurrentHashMap<Path, Path>();
129
130 final AtomicInteger counter = new AtomicInteger(pathMap.size());
131
132 for (Map.Entry<Path, Path> entry : pathMap.entrySet()) {
133 final Path remoteFilePath = entry.getKey();
134
135 NoticeableFuture<Path> noticeableFuture = getFile(
136 remoteFilePath, entry.getValue(), deleteAfterFetch);
137
138 noticeableFuture.addFutureListener(
139 new FutureListener<Path>() {
140
141 @Override
142 public void complete(Future<Path> future) {
143 if (future.isCancelled()) {
144 defaultNoticeableFuture.cancel(true);
145
146 return;
147 }
148
149 try {
150 Path localFilePath = future.get();
151
152 if (localFilePath != null) {
153 resultPathMap.put(
154 remoteFilePath, localFilePath);
155 }
156
157 if (counter.decrementAndGet() <= 0) {
158 defaultNoticeableFuture.set(resultPathMap);
159 }
160 }
161 catch (Throwable t) {
162 if (t instanceof ExecutionException) {
163 t = t.getCause();
164 }
165
166 defaultNoticeableFuture.setException(t);
167 }
168 }
169
170 });
171 }
172
173 return defaultNoticeableFuture;
174 }
175
176 @Override
177 public Path getRepositoryPath() {
178 return repositoryPath;
179 }
180
181 protected static long getLastModifiedTime(Path path) {
182 if (path == null) {
183 return Long.MIN_VALUE;
184 }
185
186 try {
187 FileTime fileTime = Files.getLastModifiedTime(path);
188
189 return fileTime.toMillis();
190 }
191 catch (IOException ioe) {
192 return Long.MIN_VALUE;
193 }
194 }
195
196 protected NoticeableFuture<Path> getFile(
197 final Path remoteFilePath, final Path localFilePath,
198 boolean deleteAfterFetch, final boolean populateCache) {
199
200 if (_log.isDebugEnabled()) {
201 _log.debug("Fetching remote file " + remoteFilePath);
202 }
203
204 final Path cachedLocalFilePath = pathMap.get(remoteFilePath);
205
206 final NoticeableFuture<FileResponse> noticeableFuture =
207 asyncBroker.post(remoteFilePath);
208
209 NettyUtil.scheduleCancellation(
210 channel, noticeableFuture, getFileTimeout);
211
212 ChannelFuture channelFuture = channel.writeAndFlush(
213 new FileRequest(
214 remoteFilePath, getLastModifiedTime(cachedLocalFilePath),
215 deleteAfterFetch));
216
217 channelFuture.addListener(
218 new ChannelFutureListener() {
219
220 @Override
221 public void operationComplete(ChannelFuture channelFuture) {
222 if (channelFuture.isSuccess()) {
223 return;
224 }
225
226 if (channelFuture.isCancelled()) {
227 noticeableFuture.cancel(true);
228
229 return;
230 }
231
232 Throwable throwable = new IOException(
233 "Unable to fetch remote file " + remoteFilePath,
234 channelFuture.cause());
235
236 if (!asyncBroker.takeWithException(
237 remoteFilePath, throwable)) {
238
239 _log.error(
240 "Unable to place exception because no future " +
241 "exists with ID " + remoteFilePath,
242 throwable);
243 }
244 }
245
246 });
247
248 return new NoticeableFutureConverter<Path, FileResponse>(
249 noticeableFuture) {
250
251 @Override
252 protected Path convert(FileResponse fileResponse)
253 throws IOException {
254
255 if (fileResponse.isFileNotFound()) {
256 if (_log.isWarnEnabled()) {
257 _log.warn(
258 "Remote file " + remoteFilePath +
259 " is not found");
260 }
261
262 return null;
263 }
264
265 if (fileResponse.isFileNotModified()) {
266 if (_log.isDebugEnabled()) {
267 _log.debug(
268 "Remote file " + remoteFilePath +
269 " is not modified, use cached local file " +
270 cachedLocalFilePath);
271 }
272
273 return cachedLocalFilePath;
274 }
275
276 FileHelperUtil.move(
277 fileResponse.getLocalFile(), localFilePath);
278
279 if (populateCache) {
280 pathMap.put(remoteFilePath, localFilePath);
281 }
282
283 if (_log.isDebugEnabled()) {
284 _log.debug(
285 "Fetched remote file " + remoteFilePath + " to " +
286 localFilePath);
287 }
288
289 return localFilePath;
290 }
291
292 };
293 }
294
295 protected final AsyncBroker<Path, FileResponse> asyncBroker =
296 new AsyncBroker<Path, FileResponse>();
297 protected final Channel channel;
298 protected final long getFileTimeout;
299 protected final Map<Path, Path> pathMap =
300 new ConcurrentHashMap<Path, Path>();
301 protected final Path repositoryPath;
302
303 private static final Log _log = LogFactoryUtil.getLog(
304 NettyRepository.class);
305
306 }