• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java BulkWriteError类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.mongodb.bulk.BulkWriteError的典型用法代码示例。如果您正苦于以下问题:Java BulkWriteError类的具体用法?Java BulkWriteError怎么用?Java BulkWriteError使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



BulkWriteError类属于com.mongodb.bulk包,在下文中一共展示了BulkWriteError类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: addWriteErrors

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
private void addWriteErrors(
        final List<BulkWriteError> wes,
        final Representation rep) {
    wes.stream().forEach(error -> {
        Representation nrep = new Representation();

        nrep.addProperty("index",
                new BsonInt32(error.getIndex()));
        nrep.addProperty("mongodbErrorCode",
                new BsonInt32(error.getCode()));
        nrep.addProperty("httpStatus",
                new BsonInt32(
                        ResponseHelper.getHttpStatusFromErrorCode(
                                error.getCode())));
        nrep.addProperty("message",
                new BsonString(
                        ResponseHelper.getMessageFromErrorCode(
                                error.getCode())));

        rep.addRepresentation("rh:error", nrep);
    });
}
 
开发者ID:SoftInstigate,项目名称:restheart,代码行数:23,代码来源:BulkResultRepresentationFactory.java


示例2: transferFlowFiles

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
protected void transferFlowFiles(ProcessSession session, List<FlowFile> flowFilesAttemptedUpdate, Map<Integer, BulkWriteError> writeErrors) {

        ComponentLog logger = this.getLogger();

        if (!writeErrors.isEmpty()) {
            logger.debug("Encountered errors on write");
            /*
             * For each Bulk Updated Document, see if it encountered an error.
             * If it had an error (based on index in the list), add the Mongo
             * Error to the FlowFile attribute and route to Failure. Otherwise,
             * route to Success
             */
            for (int i = 0; i < flowFilesAttemptedUpdate.size(); i++) {
                FlowFile ff = flowFilesAttemptedUpdate.get(i);
                if (writeErrors.containsKey(i)) {

                    logger.debug("Found error for FlowFile index {}", new Object[]{i});

                    // Add the error information to the FlowFileAttributes, and
                    // route to failure
                    BulkWriteError bwe = writeErrors.get(i);

                    logger.debug("FlowFile ID {} had Error Code {} and Message {}", new Object[]{ff.getId(), bwe.getCode(), bwe.getMessage()});

                    Map<String, String> failureAttributes = getAttributesForWriteFailure(bwe);
                    ff = session.putAllAttributes(ff, failureAttributes);

                    session.transfer(ff, REL_FAILURE);
                } else {
                    logger.debug("Routing FlowFile ID {} with Index {} to Success", new Object[]{ff.getId(), i});
                    // Flow File did not have error, so route to success
                    session.transfer(ff, REL_SUCCESS);
                }
            }
        } else {
            logger.debug("No errors encountered on bulk write, so routing all to success");
            // All succeeded, so write all to success
            session.transfer(flowFilesAttemptedUpdate, REL_SUCCESS);
        }
    }
 
开发者ID:Asymmetrik,项目名称:nifi-nars,代码行数:41,代码来源:UpdateMongo.java


示例3: assertDuplicateKeyException

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
/**
 * Ensures current exception has been generated due to a duplicate (primary) key.
 * Differentiates between Fongo and Mongo exceptions since the behaviour under these databases
 * is different.
 */
public static void assertDuplicateKeyException(Throwable exception) {
  Preconditions.checkNotNull(exception, "exception");

  // unwrap, if necessary
  exception = exception instanceof MongoException ? exception : exception.getCause();

  // fongo throws directly DuplicateKeyException
  if (exception instanceof DuplicateKeyException) return;

  // MongoDB throws custom exception
  if (exception instanceof MongoCommandException) {
    String codeName = ((MongoCommandException) exception).getResponse().get("codeName").asString().getValue();
    int errorCode = ((MongoCommandException) exception).getErrorCode();

    check(codeName).is("DuplicateKey");
    check(errorCode).is(11000);

    // all good here (can return)
    return;
  }

  // for bulk writes as well
  if (exception instanceof MongoBulkWriteException) {
    List<BulkWriteError> errors = ((MongoBulkWriteException) exception).getWriteErrors();
    check(errors).hasSize(1);
    check(errors.get(0).getCode()).is(11000);
    check(errors.get(0).getMessage()).contains("duplicate key");
    return;
  }

  // if we got here means there is a problem (no duplicate key exception)
  fail("Should get duplicate key exception after " + exception);
}
 
开发者ID:immutables,项目名称:immutables,代码行数:39,代码来源:MongoAsserts.java


示例4: executeBulkUpdate

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
protected Map<Integer, BulkWriteError> executeBulkUpdate(List<UpdateManyModel<Document>> documentsToUpdate) {
    // mapping of array indices for flow file errors
    Map<Integer, BulkWriteError> writeErrors = new HashMap<>();
    try {
        collection.bulkWrite(documentsToUpdate);
    } catch (MongoBulkWriteException e) {
        List<BulkWriteError> errors = e.getWriteErrors();
        for (BulkWriteError docError : errors) {
            writeErrors.put(docError.getIndex(), docError);
        }
        getLogger().warn("Error occurred during bulk write", e);
    }
    return writeErrors;
}
 
开发者ID:Asymmetrik,项目名称:nifi-nars,代码行数:15,代码来源:UpdateMongo.java


示例5: getAttributesForWriteFailure

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
protected Map<String, String> getAttributesForWriteFailure(BulkWriteError bwe) {
    Map<String, String> failureAttributes = new HashMap<>();
    failureAttributes.put("mongo.errorcode", String.valueOf(bwe.getCode()));
    failureAttributes.put("mongo.errormessage", bwe.getMessage());
    return failureAttributes;
}
 
开发者ID:Asymmetrik,项目名称:nifi-nars,代码行数:7,代码来源:UpdateMongo.java


示例6: onTrigger

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {

    final List<FlowFile> flowFiles = session.get(batchSize);
    if (flowFiles == null) {
        return;
    }

    ComponentLog logger = this.getLogger();

    final String source = context.getProperty(INSERT_COMMAND_SOURCE).getValue();

    List<InsertOneModel<Document>> documentsToInsert = new ArrayList<>(flowFiles.size());

    /*
     * Collect FlowFiles that are marked for bulk insertion. Matches same
     * index as documentsToInsert
     */
    List<FlowFile> flowFilesAttemptedInsert = new ArrayList<>();

    logger.debug("Attempting to batch insert {} FlowFiles", new Object[]{flowFiles.size()});
    for (FlowFile flowFile : flowFiles) {

        final String payload;

        try {
            switch (source) {
                case "content":
                    final String[] result = new String[1];
                    session.read(flowFile, (in) -> result[0] = IOUtils.toString(in));
                    payload = result[0];
                    break;
                case "attribute":
                    String command = context.getProperty(INSERT_COMMAND_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
                    payload = flowFile.getAttribute(command);
                    break;
                default:
                    throw new Exception("Invalid source choice: " + source);
            }

            BasicDBObject parse = (BasicDBObject) JSON.parse(payload);
            Document documentToInsert = new Document(parse.toMap());
            logger.debug("Creating InsertOneModel with Document {}", new Object[]{documentToInsert});

            InsertOneModel<Document> iom = new InsertOneModel<>(documentToInsert);
            documentsToInsert.add(iom);

        } catch (Exception e) {
            /*
             * If any FlowFiles error on translation to a Mongo Object, they were not added to
             * the documentsToInsert, so route to failure immediately
             */
            logger.error("Encountered exception while processing FlowFile for Mongo Storage. Routing to failure and continuing.", e);
            FlowFile failureFlowFile = session.putAttribute(flowFile, "mongo.exception", e.getMessage());
            session.transfer(failureFlowFile, REL_FAILURE);
            continue;
        }

        // add to the ordered list so we can determine which fail on bulk
        // write
        flowFilesAttemptedInsert.add(flowFile);
    }

    /*
     * Perform the bulk insert if any documents are there to insert
     */
    if (!documentsToInsert.isEmpty()) {
        logger.debug("Attempting to bulk insert {} documents", new Object[]{documentsToInsert.size()});
        Map<Integer, BulkWriteError> writeErrors = executeBulkInsert(documentsToInsert);

        /*
         * Route FlowFiles to the proper relationship based on the returned
         * errors
         */
        logger.debug("Evaluating FlowFile routing against {} Write Errors for {} FlowFiles", new Object[]{writeErrors.size(), flowFilesAttemptedInsert.size()});
        transferFlowFiles(session, flowFilesAttemptedInsert, writeErrors);
    }
}
 
开发者ID:Asymmetrik,项目名称:nifi-nars,代码行数:79,代码来源:StoreInMongo.java


示例7: executeBulkInsert

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
protected Map<Integer, BulkWriteError> executeBulkInsert(List<InsertOneModel<Document>> documentsToInsert) {
    // mapping of array indices for flow file errors
    Map<Integer, BulkWriteError> writeErrors = new HashMap<>();
    try {
        collection.bulkWrite(documentsToInsert, writeOptions);
    } catch (MongoBulkWriteException e) {
        List<BulkWriteError> errors = e.getWriteErrors();
        for (BulkWriteError docError : errors) {
            writeErrors.put(docError.getIndex(), docError);
        }
        getLogger().warn("Unable to perform bulk inserts", e);
    }
    return writeErrors;
}
 
开发者ID:Asymmetrik,项目名称:nifi-nars,代码行数:15,代码来源:StoreInMongo.java


示例8: transferFlowFiles

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
protected void transferFlowFiles(ProcessSession session, List<FlowFile> flowFilesAttemptedInsert, Map<Integer, BulkWriteError> writeErrors) {

        ComponentLog logger = this.getLogger();

        if (!writeErrors.isEmpty()) {
            logger.debug("Encountered errors on write");
            /*
             * For each Bulk Inserted Document, see if it encountered an error.
             * If it had an error (based on index in the list), add the Mongo
             * Error to the FlowFile attribute and route to Failure. Otherwise,
             * route to Success
             */
            int numFlowfiles = flowFilesAttemptedInsert.size();
            for (int i = 0; i < numFlowfiles; i++) {
                FlowFile ff = flowFilesAttemptedInsert.get(i);
                if (writeErrors.containsKey(i)) {

                    logger.debug("Found error for FlowFile index {}", new Object[]{i});

                    // Add the error information to the FlowFileAttributes, and
                    // route to failure
                    BulkWriteError bwe = writeErrors.get(i);

                    logger.debug("FlowFile ID {} had Error Code {} and Message {}", new Object[]{ff.getId(), bwe.getCode(), bwe.getMessage()});

                    Map<String, String> failureAttributes = getAttributesForWriteFailure(bwe);
                    ff = session.putAllAttributes(ff, failureAttributes);

                    session.transfer(ff, REL_FAILURE);

                    // If ordered=true, mongo will stop processing insert attempts after the first failure in a batch
                    if (writeOptions.isOrdered()) {
                        logger.debug("Routing all flowfiles after FlowFile ID {} with Index {} to Failure because an error occurred and ordered=true",
                                new Object[]{ff.getId(), i});
                        for (int j = i + 1; j < numFlowfiles; j++) {
                            ff = flowFilesAttemptedInsert.get(j);
                            ff = session.putAttribute(ff, "storeinmongo.error", "Insert not attempted because there was a failure earlier in batch and ordered=true");
                            session.transfer(ff, REL_FAILURE);
                        }
                        break;
                    }
                } else {
                    logger.debug("Routing FlowFile ID {} with Index {} to Success", new Object[]{ff.getId(), i});
                    // Flow File did not have error, so route to success
                    session.transfer(ff, REL_SUCCESS);
                }
            }
        } else {
            logger.debug("No errors encountered on bulk write, so routing all to success");
            // All succeeded, so write all to success
            session.transfer(flowFilesAttemptedInsert, REL_SUCCESS);
        }
    }
 
开发者ID:Asymmetrik,项目名称:nifi-nars,代码行数:54,代码来源:StoreInMongo.java


示例9: printDuplicatedException

import com.mongodb.bulk.BulkWriteError; //导入依赖的package包/类
/**
 * MongoBulkWriteException contains error messages that inform
 * which documents were duplicated. This method catches those ID and print them.
 * @param e
 */
private void printDuplicatedException(MongoBulkWriteException e) {
  List<BulkWriteError> errors = e.getWriteErrors();
  for (BulkWriteError error : errors) {
    String msg = error.getMessage();
    Pattern pattern = Pattern.compile("[A-Z0-9]{9}"); // regex for note ID
    Matcher matcher = pattern.matcher(msg);
    if (matcher.find()) { // if there were a note ID
      String noteId = matcher.group();
      LOG.warn("Note " + noteId + " not inserted since already exists in MongoDB");
    }
  }
}
 
开发者ID:apache,项目名称:zeppelin,代码行数:18,代码来源:MongoNotebookRepo.java



注:本文中的com.mongodb.bulk.BulkWriteError类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ModuleFactory类代码示例发布时间:2022-05-22
下一篇:
Java FuelLevelCommand类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap