001package net.sf.jkniv.whinstone.cassandra.statement;
002
003import java.math.BigDecimal;
004import java.math.BigInteger;
005import java.nio.ByteBuffer;
006import java.sql.SQLException;
007import java.sql.Statement;
008import java.util.Calendar;
009import java.util.Collections;
010import java.util.Date;
011import java.util.Iterator;
012import java.util.List;
013import java.util.Map;
014import java.util.Set;
015
016import org.slf4j.Logger;
017
018import com.datastax.driver.core.BoundStatement;
019import com.datastax.driver.core.ColumnDefinitions;
020import com.datastax.driver.core.PreparedStatement;
021import com.datastax.driver.core.ResultSet;
022import com.datastax.driver.core.Row;
023import com.datastax.driver.core.Session;
024
025import net.sf.jkniv.exception.HandlerException;
026import net.sf.jkniv.experimental.TimerKeeper;
027import net.sf.jkniv.reflect.beans.CapitalNameFactory;
028import net.sf.jkniv.reflect.beans.Capitalize;
029import net.sf.jkniv.reflect.beans.ObjectProxy;
030import net.sf.jkniv.reflect.beans.ObjectProxyFactory;
031import net.sf.jkniv.reflect.beans.PropertyAccess;
032import net.sf.jkniv.sqlegance.OneToMany;
033import net.sf.jkniv.sqlegance.RepositoryException;
034import net.sf.jkniv.sqlegance.logger.DataMasking;
035import net.sf.jkniv.whinstone.JdbcColumn;
036import net.sf.jkniv.whinstone.Param;
037import net.sf.jkniv.whinstone.Queryable;
038import net.sf.jkniv.whinstone.ResultRow;
039import net.sf.jkniv.whinstone.ResultSetParser;
040import net.sf.jkniv.whinstone.cassandra.CassandraColumn;
041import net.sf.jkniv.whinstone.cassandra.LoggerFactory;
042import net.sf.jkniv.whinstone.cassandra.RegisterCodec;
043import net.sf.jkniv.whinstone.classification.Groupable;
044import net.sf.jkniv.whinstone.classification.GroupingBy;
045import net.sf.jkniv.whinstone.classification.NoGroupingBy;
046import net.sf.jkniv.whinstone.classification.Transformable;
047import net.sf.jkniv.whinstone.statement.AutoKey;
048import net.sf.jkniv.whinstone.statement.StatementAdapter;
049import net.sf.jkniv.whinstone.types.Convertible;
050import net.sf.jkniv.whinstone.types.RegisterType;
051
052/*
053 * https://docs.datastax.com/en/developer/java-driver/3.1/manual/statements/prepared/
054 * 
055 * //FIXME unsupported method bound.setInet(...)
056 * //FIXME unsupported method bound.setConsistencyLevel(ConsistencyLevel)
057 * //FIXME unsupported method bound.setIdempotent(boolean)
058 * //FIXME unsupported method bound.setBytes(...ByteBuffer)
059 * //FIXME unsupported method bound.setInet(...InetAddress)
060 * //FIXME unsupported method bound.setPartitionKeyToken(Token)
061 * //FIXME unsupported method bound.setRoutingKey(ByteBuffer) 
062 * //FIXME unsupported method bound.setToken(...Token)
063 * //FIXME unsupported method bound.setUUID(...UUID)
064 * 
065 * @author Alisson Gomes
066 * @since 0.6.0
067 */
068@SuppressWarnings({ "unchecked", "rawtypes" })
069public class CassandraPreparedStatementAdapter<T, R> implements StatementAdapter<T, Row>
070{
071    private static final Logger  LOG = LoggerFactory.getLogger();
072    private static final Logger SQLLOG = net.sf.jkniv.whinstone.cassandra.LoggerFactory.getLogger();
073    private static final DataMasking  MASKING = LoggerFactory.getDataMasking();
074    private static final Capitalize  CAPITAL_SETTER = CapitalNameFactory.getInstanceOfSetter();
075    private final HandlerException  handlerException;
076    private final PreparedStatement stmt;
077    private final Class<T>          returnType;
078    private final Session           session;
079    private final Queryable         queryable;
080    private BoundStatement          bound;
081    private int                     index;
082    private ResultRow<T, Row>       resultRow;
083    private boolean                 scalar;
084    private AutoKey                 autoKey;
085    private final RegisterType      registerType;
086    private final RegisterCodec    registerCodec;
087    
088    public CassandraPreparedStatementAdapter(Session session, PreparedStatement stmt, Queryable queryable, RegisterType registerType, RegisterCodec registerCodec)
089    {
090        this.stmt = stmt;
091        this.session = session;
092        this.registerType = registerType;
093        this.registerCodec = registerCodec;
094        this.bound = stmt.bind();
095        this.handlerException = new HandlerException(RepositoryException.class, "Cannot set parameter [%s] value [%s]");
096        this.queryable = queryable;
097        this.returnType = (Class<T>)queryable.getReturnType();
098        this.reset();
099    }
100    
101    /**
102     * Creates a new BoundStatement object for this prepared statement. 
103     * This method do not bind any values to any of the prepared variables.
104     */
105    public void reBound() 
106    {
107        this.bound = stmt.bind();
108    }
109    
110    @Override
111    public StatementAdapter<T, Row> with(ResultRow<T, Row> resultRow)
112    {
113        this.resultRow = resultRow;
114        return this;
115    }
116    
117    @Override
118    public StatementAdapter<T, Row> bind(String name, Object value)
119    {
120        log(name, value);
121        if (name.toLowerCase().startsWith("in:"))
122        {
123            try
124            {
125                setValueIN((Object[]) value);
126                return this;
127            }
128            catch (SQLException e)
129            {
130                this.handlerException.handle(e);// FIXME handler default message with custom params
131            }
132        }
133        return bindInternal(value);
134    }
135    
136    @Override
137    public StatementAdapter<T, Row> bind(Param param)
138    {
139        Object value = param.getValueAs();
140        log(param);
141        try
142        {
143            bindInternal(value);
144        }
145        catch (Exception e)
146        {
147            this.handlerException.handle(e);// FIXME handler default message with custom params
148        }
149        return this;
150    }
151    
152    @Override
153    public StatementAdapter<T, Row> bind(Param... values)
154    {
155        //this.bound = stmt.bind(values);
156        //this.index += values.length-1;
157        for (int j=0; j<values.length; j++)
158        {
159            Param v = values[j];
160            bind(v);
161        }
162        return this;
163    }
164    /*
165    @Override
166    public void batch()
167    {
168        // TODO implements batch https://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/BatchStatement.html
169        // TODO implements batch https://docs.datastax.com/en/drivers/python/3.2/api/cassandra/query.html
170        // TODO implements batch https://docs.datastax.com/en/cql/3.3/cql/cql_using/useBatch.html
171        // TODO implements batch https://docs.datastax.com/en/cql/3.3/cql/cql_using/useBatchGoodExample.html
172        // TODO https://www.datastax.com/dev/blog/client-side-improvements-in-cassandra-2-0
173    }
174    */
175    
176    public List<T> rows()
177    {
178        ResultSet rs = null;
179        ResultSetParser<T, ResultSet> rsParser = null;
180        Groupable<T, ?> grouping = new NoGroupingBy<T, T>();
181        List<T> list = Collections.emptyList();
182        try
183        {
184            TimerKeeper.start();
185            rs = session.execute(bound);
186            queryable.getDynamicSql().getStats().add(TimerKeeper.clear());
187            
188            JdbcColumn<Row>[] columns = getJdbcColumns(rs.getColumnDefinitions());
189            setResultRow(columns);
190            //LOG.debug("AvailableWithoutFetching={}, FullyFetched={}, Exhausted={}", rs.getAvailableWithoutFetching(), rs.isFullyFetched(), rs.isExhausted());
191            Transformable<T> transformable = resultRow.getTransformable();
192            if (hasGroupingBy())
193            {
194                grouping = new GroupingBy(getGroupingBy(), returnType, transformable);
195            }
196            rsParser = new ObjectResultSetParser(resultRow, grouping);
197            list = rsParser.parser(rs);
198        }
199        catch (SQLException e)
200        {
201            queryable.getDynamicSql().getStats().add(e);
202            handlerException.handle(e, e.getMessage());
203        }
204        finally {
205            TimerKeeper.clear();            
206        }
207        return list;
208    }
209    
210    @Override
211    public void bindKey()
212    {
213        String[] properties = queryable.getDynamicSql().asInsertable().getAutoGeneratedKey().getPropertiesAsArray();
214        ObjectProxy<?> proxy = ObjectProxyFactory.of((Object)queryable.getParams());
215        Iterator<Object> it = autoKey.iterator();
216        for(int i=0; i<properties.length; i++)
217            setValueOfKey(proxy, properties[i], it.next());
218    }
219    
220    private void setValueOfKey(ObjectProxy<?> proxy, String property, Object value)
221    {
222        Convertible<Object, Object> converter = registerType.toJdbc(new PropertyAccess(property, proxy.getTargetClass()), proxy);
223        Object parsedValue = value;
224        if (!converter.getType().isInstance(value))
225            parsedValue = converter.toAttribute(value);
226        proxy.invoke(CAPITAL_SETTER.does(property), parsedValue);
227    }
228
229    @Override
230    public StatementAdapter<T, Row> with(AutoKey generateKey)
231    {
232        this.autoKey = generateKey;
233        return this;
234    }
235    
236    @Override
237    public int execute()
238    {
239        session.execute(bound);
240        return Statement.SUCCESS_NO_INFO; // FIXME design Statement.SUCCESS_NO_INFO
241    }
242    
243    @Override
244    public int reset()
245    {
246        int before = (index);
247        index = 0;
248        reBound();
249        return before;
250    }
251    
252    private void setValueIN(Object[] paramsIN) throws SQLException
253    {
254        int j = 0;
255        for (; j < paramsIN.length; j++)
256            bindInternal(paramsIN[j]);
257        //indexIN = indexIN + j;
258        /*
259        int j = 0;
260        for (; j < paramsIN.length; j++)
261            stmt.setObject(index+indexIN + j, paramsIN[j]);
262        indexIN = indexIN + j;
263         */
264    }
265    
266    /*******************************************************************************/
267    private StatementAdapter<T, Row> bindInternal(Object value)
268    {
269        if (value == null)
270        {       
271            setToNull();
272            return this;
273        }
274        try
275        {
276            String classNameValue = value.getClass().getName();
277            if (Enum.class.isInstance(value))
278                setInternalValue((Enum<?>) value);
279            else if (value instanceof List)
280                setInternalValue((List) value);
281            else if (value instanceof Set)
282                setInternalValue((Set) value);
283            else if (value instanceof Map)
284                setInternalValue((Map) value);
285            else if (classNameValue.equals("java.time.Instant"))
286                bound.set(currentIndex(), value, registerCodec.getCodec("InstantCodec").instance);
287            else if (classNameValue.equals("java.time.LocalDate"))
288                bound.set(currentIndex(), value, registerCodec.getCodec("LocalDateCodec").instance);
289            else if (classNameValue.equals("java.time.LocalDateTime"))
290                bound.set(currentIndex(), value, registerCodec.getCodec("LocalDateTimeCodec").instance);
291            else if (classNameValue.equals("java.time.LocalTime"))
292                bound.set(currentIndex(), value, registerCodec.getCodec("LocalTimeCodec").instance);
293            else if (classNameValue.equals("java.time.ZonedDateTime"))
294                bound.set(currentIndex(), value, registerCodec.getCodec("jkd8.ZonedDateTimeCodec").instance);
295            else if (classNameValue.equals("java.util.Optional"))
296                bound.set(currentIndex(), value, registerCodec.getCodec("jkd8.OptionalCodec").instance);
297            else if (classNameValue.equals("java.time.ZoneId"))
298                bound.set(currentIndex(), value, registerCodec.getCodec("jkd8.ZoneIdCodec").instance);
299            else
300                setObjectValue(value);
301            /*
302            String classNameValue = value.getClass().getName();
303            if (value instanceof String)
304                setInternalValue((String) value);
305            else if (value instanceof Integer)
306                setInternalValue((Integer) value);
307            else if (value instanceof Long)
308                setInternalValue((Long) value);
309            else if (value instanceof Double)
310                setInternalValue((Double) value);
311            else if (value instanceof Float)
312                setInternalValue((Float) value);
313            else if (value instanceof Boolean)
314                setInternalValue((Boolean) value);
315            else if (value instanceof BigDecimal)
316                setInternalValue((BigDecimal) value);
317            else if (value instanceof Date)
318                setInternalValue((Date) value);
319            else if (value instanceof java.util.Calendar)
320                setInternalValue((Calendar) value);
321            else if (Enum.class.isInstance(value))
322                setInternalValue((Enum<?>) value);
323            else if (value instanceof List)
324                setInternalValue((List) value);
325            else if (value instanceof Set)
326                setInternalValue((Set) value);
327            else if (value instanceof Map)
328                setInternalValue((Map) value);
329            else if (value instanceof com.datastax.driver.core.Duration)
330                setValue((com.datastax.driver.core.Duration)value);
331            else if (classNameValue.equals("java.time.Instant"))
332                bound.set(currentIndex(), value, registerCodec.getCodec("InstantCodec").instance);
333            else if (classNameValue.equals("java.time.LocalDate"))
334                bound.set(currentIndex(), value, registerCodec.getCodec("LocalDateCodec").instance);
335            else if (classNameValue.equals("java.time.LocalDateTime"))
336                bound.set(currentIndex(), value, registerCodec.getCodec("LocalDateTimeCodec").instance);
337            else if (classNameValue.equals("java.time.LocalTime"))
338                bound.set(currentIndex(), value, registerCodec.getCodec("LocalTimeCodec").instance);
339            else if (classNameValue.equals("java.time.ZonedDateTime"))
340                bound.set(currentIndex(), value, registerCodec.getCodec("jkd8.ZonedDateTimeCodec").instance);
341            else if (classNameValue.equals("java.util.Optional"))
342                bound.set(currentIndex(), value, registerCodec.getCodec("jkd8.OptionalCodec").instance);
343            else if (classNameValue.equals("java.time.ZoneId"))
344                bound.set(currentIndex(), value, registerCodec.getCodec("jkd8.ZoneIdCodec").instance);
345            else if (value instanceof BigInteger)
346                setInternalValue((BigInteger) value);
347            else if (value instanceof Short)
348                setInternalValue((Short) value);
349            else if (value instanceof com.datastax.driver.core.LocalDate)
350                setInternalValue((com.datastax.driver.core.LocalDate) value);
351            else if (value instanceof Byte)
352                setInternalValue((Byte) value);
353            else if (value instanceof ByteBuffer)
354                setInternalValue((ByteBuffer) value);
355            else
356            {
357                LOG.warn("CANNOT Set SQL Parameter from index [{}] with value of [{}] type of [{}]", (index), 
358                        value, (value == null ? "NULL" : value.getClass()));
359
360                //setValue(value);
361            }
362            */
363        }
364        catch (SQLException e)
365        {
366            this.handlerException.handle(e);// FIXME handler default message with custom params
367        }
368        return this;
369    }
370
371    private void setObjectValue(Object value)
372    {
373        ObjectProxy proxy = ObjectProxyFactory.of(value);
374        bound.set(currentIndex(), value, proxy.getTargetClass());
375    }
376    
377    private void setValue(com.datastax.driver.core.Duration value)
378    {
379        bound.set(currentIndex(), value, com.datastax.driver.core.Duration.class);
380    }
381    
382    private void setInternalValue(com.datastax.driver.core.LocalDate value)
383    {
384        bound.setDate(currentIndex(), value);
385    }
386    
387    private void setInternalValue(Calendar value)
388    {
389        bound.setTimestamp(currentIndex(), value.getTime());
390    }
391    
392    private void setInternalValue(Date value)
393    {
394        bound.setTimestamp(currentIndex(), value);
395    }
396    
397    private void setInternalValue(Integer value)
398    {
399        bound.setInt(currentIndex(), value);
400    }
401    
402    private void setInternalValue(Long value)
403    {
404        bound.setLong(currentIndex(), value);
405    }
406    
407    private void setInternalValue(Float value)
408    {
409        bound.setFloat(currentIndex(), value);
410    }
411    
412    private void setInternalValue(Double value)
413    {
414        bound.setDouble(currentIndex(), value);
415    }
416    
417    private void setInternalValue(Short value)
418    {
419        bound.setShort(currentIndex(), value);
420    }
421    
422    private void setInternalValue(Boolean value)
423    {
424        bound.setBool(currentIndex(), value);
425    }
426    
427    private void setInternalValue(Byte value)
428    {
429        bound.setByte(currentIndex(), value);
430    }
431
432    private void setInternalValue(ByteBuffer value)
433    {
434        bound.setBytes(currentIndex(), value);
435    }
436    
437    private void setInternalValue(BigDecimal value)
438    {
439        bound.setDecimal(currentIndex(), value);
440    }
441    
442    private void setInternalValue(BigInteger value)
443    {
444        bound.setVarint(currentIndex(), value);
445    }
446    
447    private void setInternalValue(String value)
448    {
449        bound.setString(currentIndex(), value);
450    }
451    
452    private void setToNull()
453    {
454        bound.setToNull(currentIndex());
455    }
456    
457    private void setInternalValue(Enum<?> value) throws SQLException
458    {
459        // FIXME design converter to allow save ordinal value or other value from enum
460        bound.setString(currentIndex(), value.name());
461    }
462
463    private void setInternalValue(List<?> value) throws SQLException
464    {
465        bound.setList(currentIndex(), value);
466    }
467
468    private void setInternalValue(Map<?,?> value) throws SQLException
469    {
470        bound.setMap(currentIndex(), value);
471    }
472
473    private void setInternalValue(Set<?> value) throws SQLException
474    {
475        bound.setSet(currentIndex(), value);
476    }
477
478    private int currentIndex()
479    {
480        return (index++);
481    }
482    
483    /*******************************************************************************/
484    
485    private void setResultRow(JdbcColumn<Row>[] columns)
486    {
487        if (resultRow != null)
488        {
489            resultRow.setColumns(columns);
490            return;
491        }
492        
493        if (scalar)
494            resultRow = new ScalarResultRow(columns);
495        else if (Map.class.isAssignableFrom(returnType))
496            resultRow = new MapResultRow(returnType, columns);
497        else if (Number.class.isAssignableFrom(returnType)) // FIXME implements for date, calendar, boolean improve design
498            resultRow = new NumberResultRow(returnType, columns);
499        else if (String.class.isAssignableFrom(returnType))
500            resultRow = new StringResultRow(columns);
501        else if (!hasOneToMany())
502            resultRow = new FlatObjectResultRow(returnType, columns);
503        else
504            resultRow = new PojoResultRow(returnType, columns, getOneToMany());
505    }
506    
507    private void log(String name, Object value)
508    {
509        if (SQLLOG.isDebugEnabled())
510            SQLLOG.debug("Setting SQL Parameter from index [{}] with name [{}] with value of [{}] type of [{}]", index, name,
511                    MASKING.mask(name, value), (value == null ? "NULL" : value.getClass()));
512    }
513    
514    private void log(Param param)
515    {
516        if (SQLLOG.isDebugEnabled())
517            SQLLOG.debug("Setting SQL Parameter from index [{}] with name [{}] with value of [{}] type of [{}]", index, param.getName(),
518                    MASKING.mask(param.getName(), param.getValueAs()), (param.getValueAs() == null ? "NULL" : param.getValueAs().getClass()));
519    }
520    
521    /**
522     * Summarize the columns from SQL result in binary data or not.
523     * @param metadata  object that contains information about the types and properties of the columns in a <code>ResultSet</code> 
524     * @return Array of columns with name and index
525     */
526    private JdbcColumn<Row>[] getJdbcColumns(ColumnDefinitions metadata)
527    {
528        JdbcColumn<Row>[] columns = new JdbcColumn[metadata.size()];
529        
530        for (int i = 0; i < columns.length; i++)
531        {
532            String columnName = metadata.getName(i);//getColumnName(metadata, columnNumber);
533            columns[i] = new CassandraColumn(i, columnName, metadata.getType(i).getName(), registerType, queryable.getReturnType());
534        }
535        return columns;
536    }
537
538    @Override
539    public void close()
540    {
541        // TODO how to close cassandra statement ?
542    }
543    
544    @Override
545    public void setFetchSize(int rows)
546    {
547        LOG.warn("Cassandra " + stmt.getClass() + " doesn't support fetch size!");
548    }
549    
550    private boolean hasOneToMany()
551    {
552        return !queryable.getDynamicSql().asSelectable().getOneToMany().isEmpty();
553    }
554
555    private Set<OneToMany> getOneToMany()
556    {
557        return queryable.getDynamicSql().asSelectable().getOneToMany();
558    }
559
560    private boolean hasGroupingBy()
561    {
562        return !queryable.getDynamicSql().asSelectable().getGroupByAsList().isEmpty();
563    }
564
565    private List<String> getGroupingBy()
566    {
567        return queryable.getDynamicSql().asSelectable().getGroupByAsList();        
568    }
569}