001
014
015 package com.liferay.portal.fabric.netty.fileserver.handlers;
016
017 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018 import com.liferay.portal.fabric.netty.fileserver.FileResponse;
019 import com.liferay.portal.kernel.concurrent.AsyncBroker;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022
023 import io.netty.buffer.ByteBuf;
024 import io.netty.channel.ChannelHandlerContext;
025 import io.netty.channel.ChannelInboundHandlerAdapter;
026 import io.netty.channel.ChannelPipeline;
027 import io.netty.util.concurrent.EventExecutor;
028
029 import java.io.IOException;
030
031 import java.nio.channels.FileChannel;
032 import java.nio.file.Files;
033 import java.nio.file.Path;
034 import java.nio.file.StandardOpenOption;
035 import java.nio.file.attribute.FileTime;
036
037 import java.util.concurrent.Callable;
038
039
042 public class FileUploadChannelHandler extends ChannelInboundHandlerAdapter {
043
044 public FileUploadChannelHandler(
045 AsyncBroker<Path, FileResponse> asyncBroker,
046 FileResponse fileResponse, EventExecutor eventExecutor)
047 throws IOException {
048
049 if (asyncBroker == null) {
050 throw new NullPointerException("Async broker is null");
051 }
052
053 if (fileResponse == null) {
054 throw new NullPointerException("File response is null");
055 }
056
057 if (eventExecutor == null) {
058 throw new NullPointerException("Event executor is null");
059 }
060
061 if (fileResponse.getSize() < 1) {
062 throw new IllegalArgumentException(
063 "File response has no content for uploading");
064 }
065
066 this.asyncBroker = asyncBroker;
067 this.fileResponse = fileResponse;
068 this.eventExecutor = eventExecutor;
069
070 tempFilePath = Files.createTempFile(
071 FileUploadChannelHandler.class.getName() + "-", null);
072
073 fileChannel = FileChannel.open(
074 tempFilePath, StandardOpenOption.WRITE,
075 StandardOpenOption.TRUNCATE_EXISTING);
076
077 fileResponse.setLocalFile(tempFilePath);
078 }
079
080 @Override
081 public void channelRead(
082 ChannelHandlerContext channelHandlerContext, Object object)
083 throws IOException {
084
085 ByteBuf byteBuf = (ByteBuf)object;
086
087 if (!receive(byteBuf)) {
088 return;
089 }
090
091 fileChannel.close();
092
093 if (eventExecutor.inEventLoop() || !fileResponse.isFolder()) {
094 finish();
095 }
096 else {
097 eventExecutor.submit(
098 new Callable<Void>() {
099
100 @Override
101 public Void call() throws IOException {
102 try {
103 finish();
104 }
105 catch (IOException ioe) {
106 exceptionCaught(null, ioe);
107 }
108
109 return null;
110 }
111
112 });
113 }
114
115 ChannelPipeline channelPipeline = channelHandlerContext.pipeline();
116
117 channelPipeline.remove(this);
118
119 if (byteBuf.isReadable()) {
120 channelHandlerContext.fireChannelRead(object);
121 }
122 }
123
124 @Override
125 public void exceptionCaught(
126 ChannelHandlerContext channelHandlerContext, Throwable throwable)
127 throws IOException {
128
129 _log.error("File upload failure", throwable);
130
131 if (channelHandlerContext != null) {
132 ChannelPipeline channelPipeline = channelHandlerContext.pipeline();
133
134 channelPipeline.remove(this);
135 }
136
137 if (!asyncBroker.takeWithException(fileResponse.getPath(), throwable)) {
138 _log.error(
139 "Unable to place exception because no future exists with ID " +
140 fileResponse.getPath(),
141 throwable);
142 }
143
144 fileChannel.close();
145
146 Files.delete(tempFilePath);
147 }
148
149 protected void finish() throws IOException {
150 if (fileResponse.isFolder()) {
151 fileResponse.setLocalFile(
152 FileHelperUtil.unzip(
153 tempFilePath, FileHelperUtil.TEMP_DIR_PATH));
154
155 Files.delete(tempFilePath);
156 }
157
158 Files.setLastModifiedTime(
159 fileResponse.getLocalFile(),
160 FileTime.fromMillis(fileResponse.getLastModifiedTime()));
161
162 Path path = fileResponse.getPath();
163
164 if (!asyncBroker.takeWithResult(path, fileResponse)) {
165 _log.error(
166 "Unable to place result " + fileResponse +
167 " because no future exists with ID " + path);
168 }
169 }
170
171 protected boolean receive(ByteBuf byteBuf) throws IOException {
172 while (true) {
173 long readSize = fileResponse.getSize() - fileChannel.position();
174
175 int readableBytes = byteBuf.readableBytes();
176
177 if (readSize > readableBytes) {
178 readSize = readableBytes;
179 }
180
181 if (byteBuf.readBytes(fileChannel, (int)readSize) == readSize) {
182 break;
183 }
184 }
185
186 if (!byteBuf.isReadable()) {
187 byteBuf.release();
188 }
189
190 if (fileChannel.position() < fileResponse.getSize()) {
191 return false;
192 }
193
194 return true;
195 }
196
197 protected final AsyncBroker<Path, FileResponse> asyncBroker;
198 protected final EventExecutor eventExecutor;
199 protected final FileChannel fileChannel;
200 protected final FileResponse fileResponse;
201 protected final Path tempFilePath;
202
203 private static final Log _log = LogFactoryUtil.getLog(
204 FileUploadChannelHandler.class);
205
206 }