001
014
015 package com.liferay.portal.fabric.netty.repository;
016
017 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018 import com.liferay.portal.fabric.netty.fileserver.FileRequest;
019 import com.liferay.portal.fabric.netty.fileserver.FileResponse;
020 import com.liferay.portal.fabric.netty.util.NettyUtil;
021 import com.liferay.portal.fabric.repository.Repository;
022 import com.liferay.portal.fabric.repository.RepositoryHelperUtil;
023 import com.liferay.portal.kernel.concurrent.AsyncBroker;
024 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
025 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
026 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
027 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030
031 import io.netty.channel.Channel;
032 import io.netty.channel.ChannelFuture;
033 import io.netty.channel.ChannelFutureListener;
034
035 import java.io.IOException;
036
037 import java.nio.file.Files;
038 import java.nio.file.Path;
039 import java.nio.file.StandardCopyOption;
040 import java.nio.file.attribute.FileTime;
041
042 import java.util.Map;
043 import java.util.concurrent.ConcurrentHashMap;
044 import java.util.concurrent.Future;
045 import java.util.concurrent.atomic.AtomicInteger;
046
047
050 public class NettyRepository implements Repository<Channel> {
051
052 public NettyRepository(Path repositoryPath, long getFileTimeout) {
053 if (repositoryPath == null) {
054 throw new NullPointerException("Repository path is null");
055 }
056
057 if (!Files.isDirectory(repositoryPath)) {
058 throw new IllegalArgumentException(
059 repositoryPath + " is not a directory");
060 }
061
062 this.repositoryPath = repositoryPath;
063 this.getFileTimeout = getFileTimeout;
064 }
065
066 @Override
067 public void dispose(boolean delete) {
068 for (Path path : pathMap.values()) {
069 FileHelperUtil.delete(true, path);
070 }
071
072 pathMap.clear();
073
074 if (delete) {
075 FileHelperUtil.delete(true, repositoryPath);
076 }
077 }
078
079 @Override
080 public AsyncBroker<Path, FileResponse> getAsyncBroker() {
081 return asyncBroker;
082 }
083
084 @Override
085 public NoticeableFuture<Path> getFile(
086 Channel channel, Path remoteFilePath, Path localFilePath,
087 boolean deleteAfterFetch) {
088
089 if (localFilePath == null) {
090 return getFile(
091 channel, remoteFilePath,
092 RepositoryHelperUtil.getRepositoryFilePath(
093 repositoryPath, remoteFilePath),
094 deleteAfterFetch, true);
095 }
096
097 return getFile(
098 channel, remoteFilePath, localFilePath, deleteAfterFetch, false);
099 }
100
101 @Override
102 public NoticeableFuture<Map<Path, Path>> getFiles(
103 Channel channel, Map<Path, Path> pathMap, boolean deleteAfterFetch) {
104
105 final DefaultNoticeableFuture<Map<Path, Path>> defaultNoticeableFuture =
106 new DefaultNoticeableFuture<Map<Path, Path>>();
107
108 if (pathMap.isEmpty()) {
109 defaultNoticeableFuture.set(pathMap);
110
111 return defaultNoticeableFuture;
112 }
113
114 final Map<Path, Path> resultPathMap =
115 new ConcurrentHashMap<Path, Path>();
116
117 final AtomicInteger counter = new AtomicInteger(pathMap.size());
118
119 for (Map.Entry<Path, Path> entry : pathMap.entrySet()) {
120 final Path remoteFilePath = entry.getKey();
121
122 NoticeableFuture<Path> noticeableFuture = getFile(
123 channel, remoteFilePath, entry.getValue(), deleteAfterFetch);
124
125 noticeableFuture.addFutureListener(
126 new BaseFutureListener<Path>() {
127
128 @Override
129 public void completeWithCancel(Future<Path> future) {
130 defaultNoticeableFuture.cancel(true);
131 }
132
133 @Override
134 public void completeWithException(
135 Future<Path> future, Throwable throwable) {
136
137 defaultNoticeableFuture.setException(throwable);
138 }
139
140 @Override
141 public void completeWithResult(
142 Future<Path> future, Path localFilePath) {
143
144 if (localFilePath != null) {
145 resultPathMap.put(remoteFilePath, localFilePath);
146 }
147
148 if (counter.decrementAndGet() <= 0) {
149 defaultNoticeableFuture.set(resultPathMap);
150 }
151 }
152
153 });
154 }
155
156 return defaultNoticeableFuture;
157 }
158
159 @Override
160 public Path getRepositoryPath() {
161 return repositoryPath;
162 }
163
164 protected static long getLastModifiedTime(Path path) {
165 if (path == null) {
166 return Long.MIN_VALUE;
167 }
168
169 try {
170 FileTime fileTime = Files.getLastModifiedTime(path);
171
172 return fileTime.toMillis();
173 }
174 catch (IOException ioe) {
175 return Long.MIN_VALUE;
176 }
177 }
178
179 protected NoticeableFuture<Path> getFile(
180 Channel channel, final Path remoteFilePath, final Path localFilePath,
181 boolean deleteAfterFetch, final boolean populateCache) {
182
183 if (_log.isDebugEnabled()) {
184 _log.debug("Fetching remote file " + remoteFilePath);
185 }
186
187 final Path cachedLocalFilePath = pathMap.get(remoteFilePath);
188
189 final DefaultNoticeableFuture<FileResponse> defaultNoticeableFuture =
190 new DefaultNoticeableFuture<FileResponse>();
191
192 NoticeableFuture<FileResponse> noticeableFuture = asyncBroker.post(
193 remoteFilePath, defaultNoticeableFuture);
194
195 if (noticeableFuture == null) {
196 noticeableFuture = defaultNoticeableFuture;
197
198 NettyUtil.scheduleCancellation(
199 channel, defaultNoticeableFuture, getFileTimeout);
200
201 ChannelFuture channelFuture = channel.writeAndFlush(
202 new FileRequest(
203 remoteFilePath, getLastModifiedTime(cachedLocalFilePath),
204 deleteAfterFetch));
205
206 channelFuture.addListener(
207 new ChannelFutureListener() {
208
209 @Override
210 public void operationComplete(ChannelFuture channelFuture) {
211 if (channelFuture.isSuccess()) {
212 return;
213 }
214
215 if (channelFuture.isCancelled()) {
216 defaultNoticeableFuture.cancel(true);
217
218 return;
219 }
220
221 Throwable throwable = new IOException(
222 "Unable to fetch remote file " + remoteFilePath,
223 channelFuture.cause());
224
225 if (!asyncBroker.takeWithException(
226 remoteFilePath, throwable)) {
227
228 _log.error(
229 "Unable to place exception because no future " +
230 "exists with ID " + remoteFilePath,
231 throwable);
232 }
233 }
234
235 });
236 }
237
238 return new NoticeableFutureConverter<Path, FileResponse>(
239 noticeableFuture) {
240
241 @Override
242 protected Path convert(FileResponse fileResponse)
243 throws IOException {
244
245 if (fileResponse.isFileNotFound()) {
246 if (_log.isWarnEnabled()) {
247 _log.warn(
248 "Remote file " + remoteFilePath +
249 " is not found");
250 }
251
252 return null;
253 }
254
255 if (fileResponse.isFileNotModified()) {
256 if (_log.isDebugEnabled()) {
257 _log.debug(
258 "Remote file " + remoteFilePath +
259 " is not modified, use cached local file " +
260 cachedLocalFilePath);
261 }
262
263 return cachedLocalFilePath;
264 }
265
266 Path targetLocalFilePath = localFilePath;
267
268 synchronized (fileResponse) {
269 Path recheckCacheLocalFilePath = pathMap.get(
270 remoteFilePath);
271
272 if (recheckCacheLocalFilePath != null) {
273 targetLocalFilePath = recheckCacheLocalFilePath;
274 }
275
276 Path tempLocalFilePath = fileResponse.getLocalFile();
277
278 if (tempLocalFilePath.startsWith(repositoryPath)) {
279 Files.copy(
280 fileResponse.getLocalFile(),
281 targetLocalFilePath,
282 StandardCopyOption.REPLACE_EXISTING);
283 }
284 else {
285 Files.move(
286 fileResponse.getLocalFile(),
287 targetLocalFilePath,
288 StandardCopyOption.REPLACE_EXISTING);
289 }
290
291 if (populateCache) {
292 pathMap.put(remoteFilePath, targetLocalFilePath);
293 }
294
295 fileResponse.setLocalFile(targetLocalFilePath);
296 }
297
298 if (_log.isDebugEnabled()) {
299 _log.debug(
300 "Fetched remote file " + remoteFilePath + " to " +
301 targetLocalFilePath);
302 }
303
304 return targetLocalFilePath;
305 }
306
307 };
308 }
309
310 protected final AsyncBroker<Path, FileResponse> asyncBroker =
311 new AsyncBroker<Path, FileResponse>();
312 protected final long getFileTimeout;
313 protected final Map<Path, Path> pathMap =
314 new ConcurrentHashMap<Path, Path>();
315 protected final Path repositoryPath;
316
317 private static final Log _log = LogFactoryUtil.getLog(
318 NettyRepository.class);
319
320 }