001/* 002 * JKNIV, whinstone one contract to access your database. 003 * 004 * Copyright (C) 2017, the original author or authors. 005 * 006 * This library is free software; you can redistribute it and/or 007 * modify it under the terms of the GNU Lesser General Public 008 * License as published by the Free Software Foundation; either 009 * version 2.1 of the License. 010 * 011 * This library is distributed in the hope that it will be useful, 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 014 * Lesser General Public License for more details. 015 * 016 * You should have received a copy of the GNU Lesser General Public 017 * License along with this library; if not, write to the Free Software Foundation, Inc., 018 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 019 */ 020package net.sf.jkniv.whinstone.couchdb.commands; 021 022import java.io.IOException; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.http.client.methods.CloseableHttpResponse; 028import org.apache.http.client.methods.HttpPost; 029import org.apache.http.client.methods.HttpRequestBase; 030import org.apache.http.impl.client.CloseableHttpClient; 031import org.apache.http.impl.client.HttpClients; 032import org.apache.http.util.EntityUtils; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import com.fasterxml.jackson.core.type.TypeReference; 037 038import net.sf.jkniv.reflect.beans.ObjectProxy; 039import net.sf.jkniv.reflect.beans.ObjectProxyFactory; 040import net.sf.jkniv.reflect.beans.PropertyAccess; 041import net.sf.jkniv.sqlegance.RepositoryException; 042import net.sf.jkniv.sqlegance.Sql; 043import net.sf.jkniv.whinstone.Queryable; 044import net.sf.jkniv.whinstone.couchdb.HttpBuilder; 045import net.sf.jkniv.whinstone.couchdb.statement.BulkCommandResponse; 046import net.sf.jkniv.whinstone.couchdb.statement.BulkResponse; 047import net.sf.jkniv.whinstone.couchdb.statement.DocBulk; 048 049/** 050 * <pre> 051 * 052 * http://docs.couchdb.org/en/2.2.0/api/database/bulk-api.html 053 * 054 * POST /{db}/_bulk_docs 055 * 056 * The bulk document API allows you to create and update multiple 057 * documents at the same time within a single request. The basic 058 * operation is similar to creating or updating a single document, 059 * except that you batch the document structure and information. 060 * 061 * Parameters: 062 * db – Database name 063 * 064 * Request Headers: 065 * 066 * 067 * - Accept 068 * + application/json 069 * + text/plain 070 * - Content-Type – application/json 071 * - X-Couch-Full-Commit – Overrides server’s commit policy. Possible values are: false and true. Optional 072 * 073 * 074 * Request JSON Object: 075 * 076 * - docs (array) – List of documents objects 077 * - new_edits (boolean) – If false, prevents the database from assigning them new revision IDs. Default is true. Optional 078 * 079 * 080 * Response Headers: 081 * 082 * 083 * Content-Type 084 * + application/json 085 * + text/plain; charset=utf-8 086 * 087 * Response JSON Array of Objects: 088 * 089 * 090 * - id (string) – Document ID 091 * - rev (string) – New document revision token. Available if document has saved without errors. Optional 092 * - error (string) – Error type. Optional 093 * - reason (string) – Error reason. Optional 094 * 095 * Status Codes: 096 * 097 * 201 Created – Document(s) have been created or updated 098 * 400 Bad Request – The request provided invalid JSON data 099 * 417 Expectation Failed – Occurs when at least one document was rejected by a validation function 100 * </pre> 101 * 102 * @author Alisson Gomes 103 * @since 0.6.0 104 * 105 */ 106public class BulkCommand extends AbstractCommand implements CouchCommand 107{ 108 private static final Logger LOG = LoggerFactory.getLogger(BulkCommand.class); 109 private static final Logger LOGSQL = net.sf.jkniv.whinstone.couchdb.LoggerFactory.getLogger(); 110 private Queryable queryable; 111 private HttpBuilder httpBuilder; 112 113 public BulkCommand(HttpBuilder httpBuilder, Queryable queryable) 114 { 115 super(); 116 this.httpBuilder = httpBuilder; 117 this.queryable = queryable; 118 this.method = HttpMethod.POST; 119 if (!Collection.class.isInstance(queryable.getParams())) 120 throw new RepositoryException("Bulk command no supports " 121 + (queryable.getParams() != null ? queryable.getParams().getClass().getName() : "null") 122 + " type, the parameters of queryable must be a Collection instance and > 0"); 123 124 if (queryable.getDynamicSql().isDeletable()) 125 { 126 Collection<?> objectsToDelete = (Collection<?>) queryable.getParams(); 127 boolean broken = true; 128 String className = "null"; 129 for (Object o : objectsToDelete) 130 { 131 ObjectProxy<?> proxy = ObjectProxyFactory.of(o); 132 className = proxy.getTargetClass().getName(); 133 if (proxy.hasMethod("isDeleted")) 134 { 135 Object mustDelete = proxy.invoke("isDeleted"); 136 if (mustDelete instanceof Boolean && ((Boolean) mustDelete).booleanValue()) 137 broken = false; 138 } 139 } 140 if(broken) 141 throw new RepositoryException( 142 "DELETE Bulk command must have an [boolean isDeleted()] method annotated with @JsonProperty(\"_deleted\") for " 143 + className + " type and setted as TRUE."); 144 145 } 146 DocBulk docs = new DocBulk((Collection<?>) queryable.getParams()); 147 this.body = JsonMapper.mapper(docs); 148 } 149 150 @SuppressWarnings("unchecked") 151 @Override 152 public <T> T execute() 153 { 154 String json = null; 155 CloseableHttpResponse response = null; 156 //Map<String, Object> answer = null; 157 T answer = null; 158 try 159 { 160 CloseableHttpClient httpclient = HttpClients.createDefault(); 161 String url = httpBuilder.getUrlForBulk(); 162 HttpRequestBase http = null; 163 http = asPost().newHttp(url); 164 ((HttpPost) http).setEntity(getEntity()); 165 166 // TODO supports header request X-Couch-Full-Commit – Overrides server’s commit policy. Possible values are: false and true. Optional 167 httpBuilder.setHeader(http); 168 printRequest(http); 169 response = httpclient.execute(http); 170 json = EntityUtils.toString(response.getEntity()); 171 printResponse(response, json); 172 int statusCode = response.getStatusLine().getStatusCode(); 173 if (isCreated(statusCode)) 174 { 175 BulkResponse bulkResponse = processResponse(queryable, json); 176 answer = (T) Integer.valueOf(bulkResponse.getTotalOk()); 177 } 178 else 179 { 180 Map<String, String> result = JsonMapper.mapper(json, Map.class); 181 String reason = result.get("reason"); 182 LOG.error(errorFormat(http, response.getStatusLine(), json)); 183 throw new RepositoryException(response.getStatusLine().toString() + ", " + reason); 184 } 185 //commandHandler.postCommit(); 186 } 187 catch (Exception e) // ClientProtocolException | JsonParseException | JsonMappingException | IOException 188 { 189 //commandHandler.postException(); 190 handlerException.handle(e); 191 } 192 finally 193 { 194 if (response != null) 195 { 196 try 197 { 198 response.close(); 199 } 200 catch (IOException e) 201 { 202 handlerException.handle(e); 203 } 204 } 205 } 206 return answer; 207 } 208 209 @SuppressWarnings("rawtypes") 210 private BulkResponse processResponse(Queryable queryable, String json) 211 { 212 BulkResponse response = new BulkResponse(); 213 List<BulkCommandResponse> bulk = JsonMapper.mapper(json, new TypeReference<List<BulkCommandResponse>>() 214 { 215 }); 216 response.setResponse(bulk); 217 Sql sql = queryable.getDynamicSql(); 218 boolean autoGenerateKey = false; 219 boolean isInsertable = sql.isInsertable(); 220 if (isInsertable && sql.asInsertable().isAutoGenerateKey()) 221 autoGenerateKey = true; 222 223 Collection params = (Collection) queryable.getParams(); 224 int i = 0, totalOk = 0; 225 PropertyAccess accessId = queryable.getDynamicSql().getSqlDialect().getAccessId(); 226 PropertyAccess accessRev = queryable.getDynamicSql().getSqlDialect().getAccessRevision(); 227 for (Object param : params) 228 { 229 BulkCommandResponse commandResponse = bulk.get(i++); 230 ObjectProxy<?> proxy = ObjectProxyFactory.of(param); 231 String id = (String) commandResponse.getId(); 232 String rev = (String) commandResponse.getRev(); 233 boolean isOk = commandResponse.isOk(); 234 if (autoGenerateKey) 235 { 236 String properName = sql.asInsertable().getAutoGeneratedKey().getProperties(); 237 injectAutoIdentity(proxy, param, id, rev, properName, accessId, accessRev); 238 } 239 else 240 { 241 injectIdentity(proxy, param, id, rev, accessId, accessRev); 242 } 243 244 if (isInsertable) 245 { 246 if (isOk) 247 totalOk++; 248 else 249 LOG.error("Document {} insert error", param); 250 } 251 else 252 { 253 if (isOk) 254 totalOk++; 255 else 256 { 257 LOG.error("Document {} update error [{}] reason [{}]", param, commandResponse.getError(), 258 commandResponse.getReason()); 259 } 260 } 261 } 262 response.setTotalOk(totalOk); 263 return response; 264 } 265 266 @Override 267 public String getBody() 268 { 269 return this.body; 270 } 271 272 @Override 273 public HttpMethod asPut() 274 { 275 this.method = HttpMethod.PUT; 276 return this.method; 277 } 278 279 @Override 280 public HttpMethod asPost() 281 { 282 this.method = HttpMethod.POST; 283 return this.method; 284 } 285 286}