001
014
015 package com.liferay.portal.kernel.dao.jdbc;
016
017 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018 import com.liferay.portal.kernel.concurrent.FutureListener;
019 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022 import com.liferay.portal.kernel.util.GetterUtil;
023 import com.liferay.portal.kernel.util.PropsKeys;
024 import com.liferay.portal.kernel.util.PropsUtil;
025 import com.liferay.portal.kernel.util.ProxyUtil;
026
027 import java.lang.reflect.InvocationHandler;
028 import java.lang.reflect.Method;
029
030 import java.sql.Connection;
031 import java.sql.DatabaseMetaData;
032 import java.sql.PreparedStatement;
033 import java.sql.SQLException;
034
035 import java.util.Set;
036 import java.util.concurrent.Callable;
037 import java.util.concurrent.ExecutionException;
038 import java.util.concurrent.Future;
039
040
044 public class AutoBatchPreparedStatementUtil {
045
046 public static PreparedStatement autoBatch(
047 PreparedStatement preparedStatement)
048 throws SQLException {
049
050 Connection connection = preparedStatement.getConnection();
051
052 DatabaseMetaData databaseMetaData = connection.getMetaData();
053
054 if (databaseMetaData.supportsBatchUpdates()) {
055 return (PreparedStatement)ProxyUtil.newProxyInstance(
056 ClassLoader.getSystemClassLoader(), _interfaces,
057 new BatchInvocationHandler(preparedStatement));
058 }
059
060 return (PreparedStatement)ProxyUtil.newProxyInstance(
061 ClassLoader.getSystemClassLoader(), _interfaces,
062 new NoBatchInvocationHandler(preparedStatement));
063 }
064
065 public static PreparedStatement concurrentAutoBatch(
066 Connection connection, String sql)
067 throws SQLException {
068
069 DatabaseMetaData databaseMetaData = connection.getMetaData();
070
071 if (databaseMetaData.supportsBatchUpdates()) {
072 return (PreparedStatement)ProxyUtil.newProxyInstance(
073 ClassLoader.getSystemClassLoader(), _interfaces,
074 new ConcurrentBatchInvocationHandler(connection, sql));
075 }
076
077 return (PreparedStatement)ProxyUtil.newProxyInstance(
078 ClassLoader.getSystemClassLoader(), _interfaces,
079 new ConcurrentNoBatchInvocationHandler(connection, sql));
080 }
081
082 private static final int _HIBERNATE_JDBC_BATCH_SIZE = GetterUtil.getInteger(
083 PropsUtil.get(PropsKeys.HIBERNATE_JDBC_BATCH_SIZE));
084
085 private static final Method _addBatchMethod;
086 private static final Method _closeMethod;
087 private static final Method _executeBatch;
088 private static final Class<?>[] _interfaces =
089 new Class<?>[] {PreparedStatement.class};
090
091 static {
092 try {
093 _addBatchMethod = PreparedStatement.class.getMethod("addBatch");
094 _closeMethod = PreparedStatement.class.getMethod("close");
095 _executeBatch = PreparedStatement.class.getMethod("executeBatch");
096 }
097 catch (NoSuchMethodException nsme) {
098 throw new ExceptionInInitializerError(nsme);
099 }
100 }
101
102 private static class BatchInvocationHandler implements InvocationHandler {
103
104 @Override
105 public Object invoke(Object proxy, Method method, Object[] args)
106 throws Throwable {
107
108 if (method.equals(_executeBatch)) {
109 if (_count > 0) {
110 _count = 0;
111
112 return _preparedStatement.executeBatch();
113 }
114
115 return new int[0];
116 }
117
118 if (!method.equals(_addBatchMethod)) {
119 return method.invoke(_preparedStatement, args);
120 }
121
122 _preparedStatement.addBatch();
123
124 if (++_count >= _HIBERNATE_JDBC_BATCH_SIZE) {
125 _preparedStatement.executeBatch();
126
127 _count = 0;
128 }
129
130 return null;
131 }
132
133 private BatchInvocationHandler(PreparedStatement preparedStatement) {
134 _preparedStatement = preparedStatement;
135 }
136
137 private int _count;
138 private final PreparedStatement _preparedStatement;
139
140 }
141
142 private static class ConcurrentBatchInvocationHandler
143 implements InvocationHandler {
144
145 @Override
146 public Object invoke(Object proxy, Method method, Object[] args)
147 throws Throwable {
148
149 if (method.equals(_addBatchMethod)) {
150 _preparedStatement.addBatch();
151
152 if (++_count >= _HIBERNATE_JDBC_BATCH_SIZE) {
153 _executeBatch();
154 }
155
156 return null;
157 }
158
159 if (method.equals(_executeBatch)) {
160 if (_count > 0) {
161 _executeBatch();
162 }
163
164 return new int[0];
165 }
166
167 if (method.equals(_closeMethod)) {
168 Throwable throwable = null;
169
170 for (Future<Void> future : _futures) {
171 try {
172 future.get();
173 }
174 catch (Throwable t) {
175 if (t instanceof ExecutionException) {
176 t = t.getCause();
177 }
178
179 if (throwable == null) {
180 throwable = t;
181 }
182 else {
183 throwable.addSuppressed(t);
184 }
185 }
186 }
187
188 if (throwable != null) {
189 throw throwable;
190 }
191 }
192
193 return method.invoke(_preparedStatement, args);
194 }
195
196 private ConcurrentBatchInvocationHandler(
197 Connection connection, String sql)
198 throws SQLException {
199
200 _connection = connection;
201 _sql = sql;
202
203 _preparedStatement = _connection.prepareStatement(_sql);
204 }
205
206 private void _executeBatch() throws SQLException {
207 _count = 0;
208
209 final PreparedStatement preparedStatement = _preparedStatement;
210
211 NoticeableFuture<Void> noticeableFuture =
212 _threadPoolExecutor.submit(
213 new Callable<Void>() {
214
215 @Override
216 public Void call() throws SQLException {
217 try {
218 preparedStatement.executeBatch();
219 }
220 finally {
221 synchronized (_connection) {
222 preparedStatement.close();
223 }
224 }
225
226 return null;
227 }
228
229 });
230
231 _futures.add(noticeableFuture);
232
233 noticeableFuture.addFutureListener(
234 new FutureListener<Void>() {
235
236 @Override
237 public void complete(Future<Void> future) {
238 try {
239 future.get();
240
241 _futures.remove(future);
242 }
243 catch (Throwable t) {
244 }
245 }
246
247 });
248
249 _preparedStatement = _connection.prepareStatement(_sql);
250 }
251
252 private final Connection _connection;
253 private int _count;
254 private final Set<Future<Void>> _futures = new ConcurrentHashSet<>();
255 private PreparedStatement _preparedStatement;
256 private final String _sql;
257 private final ThreadPoolExecutor _threadPoolExecutor =
258 PortalExecutorManagerUtil.getPortalExecutor(
259 ConcurrentBatchInvocationHandler.class.getName());
260
261 }
262
263 private static class ConcurrentNoBatchInvocationHandler
264 implements InvocationHandler {
265
266 @Override
267 public Object invoke(Object proxy, Method method, Object[] args)
268 throws Throwable {
269
270 if (method.equals(_addBatchMethod)) {
271 _executeUpdate();
272
273 return null;
274 }
275
276 if (method.equals(_executeBatch)) {
277 return new int[0];
278 }
279
280 if (method.equals(_closeMethod)) {
281 Throwable throwable = null;
282
283 for (Future<Void> future : _futures) {
284 try {
285 future.get();
286 }
287 catch (Throwable t) {
288 if (t instanceof ExecutionException) {
289 t = t.getCause();
290 }
291
292 if (throwable == null) {
293 throwable = t;
294 }
295 else {
296 throwable.addSuppressed(t);
297 }
298 }
299 }
300
301 if (throwable != null) {
302 throw throwable;
303 }
304 }
305
306 return method.invoke(_preparedStatement, args);
307 }
308
309 private ConcurrentNoBatchInvocationHandler(
310 Connection connection, String sql)
311 throws SQLException {
312
313 _connection = connection;
314 _sql = sql;
315
316 _preparedStatement = _connection.prepareStatement(_sql);
317 }
318
319 private void _executeUpdate() throws SQLException {
320 final PreparedStatement preparedStatement = _preparedStatement;
321
322 NoticeableFuture<Void> noticeableFuture =
323 _threadPoolExecutor.submit(
324 new Callable<Void>() {
325
326 @Override
327 public Void call() throws SQLException {
328 try {
329 preparedStatement.executeUpdate();
330 }
331 finally {
332 synchronized (_connection) {
333 preparedStatement.close();
334 }
335 }
336
337 return null;
338 }
339
340 });
341
342 _futures.add(noticeableFuture);
343
344 noticeableFuture.addFutureListener(
345 new FutureListener<Void>() {
346
347 @Override
348 public void complete(Future<Void> future) {
349 try {
350 future.get();
351
352 _futures.remove(future);
353 }
354 catch (Throwable t) {
355 }
356 }
357
358 });
359
360 _preparedStatement = _connection.prepareStatement(_sql);
361 }
362
363 private final Connection _connection;
364 private final Set<Future<Void>> _futures = new ConcurrentHashSet<>();
365 private PreparedStatement _preparedStatement;
366 private final String _sql;
367 private final ThreadPoolExecutor _threadPoolExecutor =
368 PortalExecutorManagerUtil.getPortalExecutor(
369 ConcurrentNoBatchInvocationHandler.class.getName());
370
371 }
372
373 private static class NoBatchInvocationHandler implements InvocationHandler {
374
375 @Override
376 public Object invoke(Object proxy, Method method, Object[] args)
377 throws Throwable {
378
379 if (method.equals(_addBatchMethod)) {
380 _preparedStatement.executeUpdate();
381
382 return null;
383 }
384
385 if (method.equals(_executeBatch)) {
386 return new int[0];
387 }
388
389 return method.invoke(_preparedStatement, args);
390 }
391
392 private NoBatchInvocationHandler(PreparedStatement preparedStatement) {
393 _preparedStatement = preparedStatement;
394 }
395
396 private final PreparedStatement _preparedStatement;
397
398 }
399
400 }