• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java KuduTable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kudu.client.KuduTable的典型用法代码示例。如果您正苦于以下问题:Java KuduTable类的具体用法?Java KuduTable怎么用?Java KuduTable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KuduTable类属于org.apache.kudu.client包,在下文中一共展示了KuduTable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: applyRandomMutations

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
@Override
public void applyRandomMutations(List<PlannedRow> planned) throws Exception {
  KuduTable table = connectToTable();

  List<Operation> operations = extractOperations(planned, table);

  for (Operation operation : operations) {
    session.apply(operation);
  }

  // Wait until all operations have completed before checking for errors.
  while (session.hasPendingOperations()) {
    Thread.sleep(1);
  }

  // Fail fast on any error applying mutations
  if (session.countPendingErrors() > 0) {
    RowError firstError = session.getPendingErrors().getRowErrors()[0];
    String errorMessage = String.format("Kudu output error '%s' during operation '%s' at tablet server '%s'",
        firstError.getErrorStatus(), firstError.getOperation(), firstError.getTsUUID());

    throw new RuntimeException(errorMessage);
  }
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:25,代码来源:KuduOutput.java


示例2: getExistingForFilters

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
@Override
public Iterable<Row> getExistingForFilters(Iterable<Row> filters) throws Exception {
  List<Row> existingForFilters = Lists.newArrayList();

  if (!filters.iterator().hasNext()) {
    return existingForFilters;
  }

  KuduTable table = connectToTable();
  KuduScanner scanner = scannerForFilters(filters, table);

  long startTime = System.nanoTime();
  while (scanner.hasMoreRows()) {
    for (RowResult rowResult : scanner.nextRows()) {
      Row existing = resultAsRow(rowResult, table);

      existingForFilters.add(existing);
    }
  }
  long endTime = System.nanoTime();
  if (hasAccumulators()) {
    accumulators.getDoubleAccumulators().get(ACCUMULATOR_SECONDS_SCANNING).add((endTime - startTime) / 1000.0 / 1000.0 / 1000.0);
  }

  return existingForFilters;
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:27,代码来源:KuduOutput.java


示例3: connectToTable

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
private synchronized KuduTable connectToTable() throws KuduException {
  if (client == null) {
    LOG.info("Connecting to Kudu");

    String masterAddresses = config.getString(CONNECTION_CONFIG_NAME);

    client = new KuduClient.KuduClientBuilder(masterAddresses).build();
    session = client.newSession();
    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND);
    session.setMutationBufferSpace(10000);
    session.setIgnoreAllDuplicateRows(isInsertIgnore());

    LOG.info("Connection to Kudu established");
  }

  String tableName = config.getString(TABLE_CONFIG_NAME);
  KuduTable table = getTable(tableName);

  return table;
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:21,代码来源:KuduOutput.java


示例4: Tagsets

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
Tagsets(AsyncKuduClient client, Tags tags, KuduTable tagsetsTable) {
  this.client = client;
  this.tagsetsTable = tagsetsTable;
  this.tags = tags;
  this.columnIndexes = ImmutableList.of(Tables.TAGSETS_ID_INDEX,
                                        Tables.TAGSETS_TAGSET_INDEX);
  this.tagsets = CacheBuilder.newBuilder()
      .maximumSize(1024 * 1024)
      .build(new CacheLoader<SerializedTagset, Deferred<Integer>>() {
        @Override
        public Deferred<Integer> load(SerializedTagset tagset) {
          return lookupOrInsertTagset(tagset, hashForTesting == null ?
                                              tagset.hashCode() : hashForTesting);
        }
      });
}
 
开发者ID:danburkert,项目名称:kudu-ts,代码行数:17,代码来源:Tagsets.java


示例5: open

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
/**
 * Opens a Kudu TS instance on a Kudu cluster.
 *
 * @param kuduMasterAddressess list of "host:port" pair master addresses
 * @param name the name of the Kudu timeseries store. Multiple instances of
 *             Kudu TS can occupy the same Kudu cluster by using a different name.
 * @return the opened {@code KuduTS}.
 * @throws Exception on error
 */
public static KuduTS open(List<String> kuduMasterAddressess, String name) throws Exception {
  AsyncKuduClient client = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasterAddressess).build();

  Deferred<KuduTable> metricsDeferred = client.openTable(Tables.metricsTableName(name));
  Deferred<KuduTable> tagsetsDeferred = client.openTable(Tables.tagsetsTableName(name));
  Deferred<KuduTable> tagsDeferred = client.openTable(Tables.tagsTableName(name));
  KuduTable metricsTable = metricsDeferred.join(client.getDefaultAdminOperationTimeoutMs());
  KuduTable tagsetsTable = tagsetsDeferred.join(client.getDefaultAdminOperationTimeoutMs());
  KuduTable tagsTable = tagsDeferred.join(client.getDefaultAdminOperationTimeoutMs());

  Tags tags = new Tags(client, tagsTable);
  Tagsets tagsets = new Tagsets(client, tags, tagsetsTable);
  Metrics metrics = new Metrics(client, metricsTable, tagsets);
  return new KuduTS(client, name, metrics, tagsets, tags);
}
 
开发者ID:danburkert,项目名称:kudu-ts,代码行数:25,代码来源:KuduTS.java


示例6: openOrCreateTable

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
private static Deferred<KuduTable> openOrCreateTable(final AsyncKuduClient client,
                                                     final String table,
                                                     final Schema schema,
                                                     final CreateTableOptions options) throws Exception {
  class CreateTableErrback implements Callback<Deferred<KuduTable>, Exception> {
    @Override
    public Deferred<KuduTable> call(Exception e) throws Exception {
      // TODO(danburkert): we should only do this if the error is "not found"
      LOG.debug("Creating table {}", table);
      return client.createTable(table, schema, options);
    }
    @Override
    public String toString() {
      return MoreObjects.toStringHelper(this).add("table", table).toString();
    }
  }

  return client.openTable(table).addErrback(new CreateTableErrback());
}
 
开发者ID:danburkert,项目名称:kudu-ts,代码行数:20,代码来源:KuduTS.java


示例7: getKuduScanTokensForSelectAllColumns

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
/**
 * Builds a set of scan tokens. The list of scan tokens are generated as if the entire table is being scanned
 * i.e. a SELECT * FROM TABLE equivalent expression. This list is used to assign the partition pie assignments
 * for all of the planned partition of operators. Each operator gets a part of the PIE as if all columns were
 * selected. Subsequently when a query is to be processed, the query is used to generate the scan tokens applicable
 * for that query. Given that partition pie represents the entire data set, the scan assignments for the current
 * query will be a subset.
 * @return The list of scan tokens as if the entire table is getting scanned.
 * @throws Exception in cases when the connection to kudu cluster cannot be closed.
 */
public List<KuduScanToken> getKuduScanTokensForSelectAllColumns() throws Exception
{
  // We are not using the current query for deciding the partition strategy but a SELECT * as
  // we do not want to want to optimize on just the current query. This prevents rapid throttling of operator
  // instances when the scan patterns are erratic. On the other hand, this might result on under utilized
  // operator resources in the DAG but will be consistent at a minimum.
  ApexKuduConnection apexKuduConnection = prototypeKuduInputOperator.getApexKuduConnectionInfo().build();
  KuduClient clientHandle = apexKuduConnection.getKuduClient();
  KuduTable table = apexKuduConnection.getKuduTable();
  KuduScanToken.KuduScanTokenBuilder builder = clientHandle.newScanTokenBuilder(table);
  List<String> allColumns = new ArrayList<>();
  List<ColumnSchema> columnList = apexKuduConnection.getKuduTable().getSchema().getColumns();
  for ( ColumnSchema column : columnList) {
    allColumns.add(column.getName());
  }
  builder.setProjectedColumnNames(allColumns);
  LOG.debug("Building the partition pie assignments for the input operator");
  List<KuduScanToken> allPossibleTokens = builder.build();
  apexKuduConnection.close();
  return allPossibleTokens;
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:32,代码来源:AbstractKuduInputPartitioner.java


示例8: buildColumnSchemaForTable

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
/**
 * Scans the metadata for the kudu table that this operator is scanning for and
 * returns back the mapping for the kudu column name to the ColumnSchema metadata definition.
 * Note that the Kudu columns names are case sensitive.
 * @return A Map with Kudu column names as keys and value as the Column Definition.
 * @throws Exception
 */
private Map<String,ColumnSchema> buildColumnSchemaForTable() throws Exception
{
  if (kuduColNameToSchemaMapping == null) {
    ApexKuduConnection connectionForMetaDataScan = apexKuduConnectionInfo.build();
    KuduTable table = connectionForMetaDataScan.getKuduTable();
    List<ColumnSchema> tableColumns =  table.getSchema().getColumns();
    connectionForMetaDataScan.close();
    Map<String,ColumnSchema> columnSchemaMap = new HashMap<>();
    for (ColumnSchema aColumn: tableColumns) {
      columnSchemaMap.put(aColumn.getName(),aColumn);
    }
    kuduColNameToSchemaMapping = columnSchemaMap;
  }
  return kuduColNameToSchemaMapping;
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:23,代码来源:AbstractKuduInputOperator.java


示例9: getOperation

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
/**
 * Return Operation based on the operation code. If the code has a number
 * that Kudu destination doesn't support, it throws UnsupportedOperationException.
 * @param table
 * @param op
 * @return
 * @throws UnsupportedOperationException
 */
protected Operation getOperation(KuduTable table, int op) throws UnsupportedOperationException {
  Operation operation = null;
  switch (op) {
    case OperationType.INSERT_CODE:
      operation = table.newInsert();
      break;
    case OperationType.UPSERT_CODE:
      operation = table.newUpsert();
      break;
    case OperationType.UPDATE_CODE:
      operation = table.newUpdate();
      break;
    case OperationType.DELETE_CODE:
      operation = table.newDelete();
      break;
    default:
      LOG.error("Operation {} not supported", op);
      throw new UnsupportedOperationException(String.format("Unsupported Operation: %s", op));
  }
  return operation;
}
 
开发者ID:streamsets,项目名称:datacollector,代码行数:30,代码来源:KuduTarget.java


示例10: openTable

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
public KuduTable openTable(KuduClient client, final String name)

    {
        KuduTable kuduTable = null;
        try {
            kuduTable = client.openTable(name);
        }
        catch (KuduException e) {
            log.error(e, e.getMessage());
        }
        return kuduTable;
    }
 
开发者ID:trackingio,项目名称:presto-kudu,代码行数:13,代码来源:KuduClientManager.java


示例11: resultAsRow

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
private Row resultAsRow(RowResult result, KuduTable table) throws KuduException {
  List<Object> values = Lists.newArrayList();

  for (ColumnSchema columnSchema : table.getSchema().getColumns()) {
    String columnName = columnSchema.getName();

    if (result.isNull(columnName)) {
      values.add(null);
      continue;
    }

    switch (columnSchema.getType()) {
      case DOUBLE:
        values.add(result.getDouble(columnName));
        break;
      case FLOAT:
        values.add(result.getFloat(columnName));
        break;
      case INT32:
        values.add(result.getInt(columnName));
        break;
      case INT64:
        values.add(result.getLong(columnName));
        break;
      case STRING:
        values.add(result.getString(columnName));
        break;
      case BOOL:
        values.add(result.getBoolean(columnName));
        break;
      default:
        throw new RuntimeException("Unsupported Kudu column type: " + columnSchema.getType());
    }
  }

  Row row = new RowWithSchema(getTableSchema(table), values.toArray());

  return row;
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:40,代码来源:KuduOutput.java


示例12: schemaFor

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
private StructType schemaFor(KuduTable table) {
  List<String> fieldNames = Lists.newArrayList();
  List<String> fieldTypes = Lists.newArrayList();

  for (ColumnSchema columnSchema : table.getSchema().getColumns()) {
    String fieldName = columnSchema.getName();
    String fieldType;

    switch (columnSchema.getType()) {
      case DOUBLE:
        fieldType = "double";
        break;
      case FLOAT:
        fieldType = "float";
        break;
      case INT32:
        fieldType = "int";
        break;
      case INT64:
        fieldType = "long";
        break;
      case STRING:
        fieldType = "string";
        break;
      case BOOL:
        fieldType = "boolean";
        break;
      default:
        throw new RuntimeException("Unsupported Kudu column type: " + columnSchema.getType());
    }

    fieldNames.add(fieldName);
    fieldTypes.add(fieldType);
  }

  StructType tableSchema = RowUtils.structTypeFor(fieldNames, fieldTypes);

  return tableSchema;
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:40,代码来源:KuduOutput.java


示例13: scannerForFilters

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
private KuduScanner scannerForFilters(Iterable<Row> filters, KuduTable table) {
  List<Row> filtersList = Lists.newArrayList(filters);

  if (filtersList.size() == 0) {
    throw new RuntimeException("Kudu existing filter was not provided.");
  }
  
  if (filtersList.get(0).schema() == null) {
    throw new RuntimeException("Kudu existing filter did not contain a schema.");
  }
  
  if (hasAccumulators()) {
    accumulators.getLongAccumulators().get(ACCUMULATOR_NUMBER_OF_SCANNERS).add(1);
    accumulators.getLongAccumulators().get(ACCUMULATOR_NUMBER_OF_FILTERS_SCANNED).add(filtersList.size());
  }
  
  KuduScannerBuilder builder = client.newScannerBuilder(table);

  for (String fieldName : filtersList.get(0).schema().fieldNames()) {
    ColumnSchema columnSchema = table.getSchema().getColumn(fieldName);

    List<Object> columnValues = Lists.newArrayList();
    for (Row filter : filtersList) {
      Object columnValue = RowUtils.get(filter, fieldName);
      columnValues.add(columnValue);
    }

    KuduPredicate predicate = KuduPredicate.newInListPredicate(columnSchema, columnValues);

    builder = builder.addPredicate(predicate);
  }

  KuduScanner scanner = builder.build();

  return scanner;
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:37,代码来源:KuduOutput.java


示例14: getTable

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
private synchronized KuduTable getTable(String tableName) throws KuduException {
  if (tables == null) {
    tables = Maps.newHashMap();
  }

  if (tables.containsKey(tableName)) {
    return tables.get(tableName);
  }
  else {
    KuduTable table = client.openTable(tableName);
    tables.put(tableName, table);
    return table;
  }
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:15,代码来源:KuduOutput.java


示例15: getTableSchema

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
private synchronized StructType getTableSchema(KuduTable table) throws KuduException {
  if (tableSchemas == null) {
    tableSchemas = Maps.newHashMap();
  }

  if (tableSchemas.containsKey(table.getName())) {
    return tableSchemas.get(table.getName());
  }
  else {
    StructType tableSchema = schemaFor(table);
    tableSchemas.put(table.getName(), tableSchema);
    return tableSchema;
  }
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:15,代码来源:KuduOutput.java


示例16: openOrCreate

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
/**
 * Creates (if necessary) and opens a Kudu TS instance on a Kudu cluster.
 *
 * @param kuduMasterAddressess list of "host:port" pair master addresses
 * @param name the name of the Kudu timeseries store. Multiple instances of
 *             Kudu TS can occupy the same Kudu cluster by using a different name.
 * @return the opened {@code KuduTS}.
 * @throws Exception on error
 */
public static KuduTS openOrCreate(List<String> kuduMasterAddressess,
                                  String name,
                                  CreateOptions options) throws Exception {

  AsyncKuduClient client = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasterAddressess).build();

  int numTabletServers = client.listTabletServers()
                               .joinUninterruptibly(client.getDefaultAdminOperationTimeoutMs())
                               .getTabletServersCount();

  Deferred<KuduTable> metricsDeferred = openOrCreateTable(client,
                                                          Tables.metricsTableName(name),
                                                          Tables.METRICS_SCHEMA,
                                                          Tables.metricsCreateTableOptions(options, numTabletServers));
  Deferred<KuduTable> tagsetsDeferred = openOrCreateTable(client,
                                                          Tables.tagsetsTableName(name),
                                                          Tables.TAGSETS_SCHEMA,
                                                          Tables.tagsetsCreateTableOptions(options, numTabletServers));
  Deferred<KuduTable> tagsDeferred = openOrCreateTable(client,
                                                       Tables.tagsTableName(name),
                                                       Tables.TAGS_SCHEMA,
                                                       Tables.tagsCreateTableOptions(options, numTabletServers));
  KuduTable metricsTable = metricsDeferred.join(client.getDefaultAdminOperationTimeoutMs());
  KuduTable tagsetsTable = tagsetsDeferred.join(client.getDefaultAdminOperationTimeoutMs());
  KuduTable tagsTable = tagsDeferred.join(client.getDefaultAdminOperationTimeoutMs());

  Tags tags = new Tags(client, tagsTable);
  Tagsets tagsets = new Tagsets(client, tags, tagsetsTable);
  Metrics metrics = new Metrics(client, metricsTable, tagsets);
  return new KuduTS(client, name, metrics, tagsets, tags);
}
 
开发者ID:danburkert,项目名称:kudu-ts,代码行数:41,代码来源:KuduTS.java


示例17: getTable

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
@Override
public Table getTable(String name) {
  KuduScanSpec scanSpec = new KuduScanSpec(name);
  try {
    KuduTable table = plugin.getClient().openTable(name);
    Schema schema = table.getSchema();
    return new DrillKuduTable(schemaName, plugin, schema, scanSpec);
  } catch (Exception e) {
    logger.warn("Failure while retrieving kudu table {}", name, e);
    return null;
  }

}
 
开发者ID:axbaretto,项目名称:drill,代码行数:14,代码来源:KuduSchemaFactory.java


示例18: setup

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
  this.output = output;
  this.context = context;
  try {
    KuduTable table = client.openTable(scanSpec.getTableName());

    KuduScannerBuilder builder = client.newScannerBuilder(table);
    if (!isStarQuery()) {
      List<String> colNames = Lists.newArrayList();
      for (SchemaPath p : this.getColumns()) {
        colNames.add(p.getRootSegmentPath());
      }
      builder.setProjectedColumnNames(colNames);
    }

    context.getStats().startWait();
    try {
      scanner = builder
          .lowerBoundRaw(scanSpec.getStartKey())
          .exclusiveUpperBoundRaw(scanSpec.getEndKey())
          .build();
    } finally {
      context.getStats().stopWait();
    }
  } catch (Exception e) {
    throw new ExecutionSetupException(e);
  }
}
 
开发者ID:axbaretto,项目名称:drill,代码行数:30,代码来源:KuduRecordReader.java


示例19: truncateTable

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
public void truncateTable() throws Exception
{
  AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForDeletingRows =
      unitTestStepwiseScanInputOperator.getScanner();
  List<KuduScanToken> scansForAllTablets = unitTestStepwiseScanInputOperator
      .getPartitioner().getKuduScanTokensForSelectAllColumns();
  ApexKuduConnection aCurrentConnection = scannerForDeletingRows.getConnectionPoolForThreads().get(0);
  KuduSession aSessionForDeletes = aCurrentConnection.getKuduClient().newSession();
  KuduTable currentTable = aCurrentConnection.getKuduTable();
  for ( KuduScanToken aTabletScanToken : scansForAllTablets) {
    KuduScanner aScanner = aTabletScanToken.intoScanner(aCurrentConnection.getKuduClient());
    while ( aScanner.hasMoreRows()) {
      RowResultIterator itrForRows = aScanner.nextRows();
      while ( itrForRows.hasNext()) {
        RowResult aRow = itrForRows.next();
        int intRowKey = aRow.getInt("introwkey");
        String stringRowKey = aRow.getString("stringrowkey");
        long timestampRowKey = aRow.getLong("timestamprowkey");
        Delete aDeleteOp = currentTable.newDelete();
        aDeleteOp.getRow().addInt("introwkey",intRowKey);
        aDeleteOp.getRow().addString("stringrowkey", stringRowKey);
        aDeleteOp.getRow().addLong("timestamprowkey",timestampRowKey);
        aSessionForDeletes.apply(aDeleteOp);
      }
    }
  }
  aSessionForDeletes.close();
  Thread.sleep(2000); // Sleep to allow for scans to complete
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:30,代码来源:KuduInputOperatorCommons.java


示例20: addTestDataRows

import org.apache.kudu.client.KuduTable; //导入依赖的package包/类
public void addTestDataRows(int numRowsInEachPartition) throws Exception
{
  int intRowKeyStepsize = Integer.MAX_VALUE / SPLIT_COUNT_FOR_INT_ROW_KEY;
  int splitBoundaryForIntRowKey = intRowKeyStepsize;
  int[] inputrowkeyPartitionEntries = new int[SPLIT_COUNT_FOR_INT_ROW_KEY + 1];
  // setting the int keys that will fall in the range of all partitions
  for ( int i = 0; i < SPLIT_COUNT_FOR_INT_ROW_KEY; i++) {
    inputrowkeyPartitionEntries[i] = splitBoundaryForIntRowKey + 3; // 3 to fall into the partition next to boundary
    splitBoundaryForIntRowKey += intRowKeyStepsize;
  }
  inputrowkeyPartitionEntries[SPLIT_COUNT_FOR_INT_ROW_KEY] = splitBoundaryForIntRowKey + 3;
  AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForAddingRows =
      unitTestStepwiseScanInputOperator.getScanner();
  ApexKuduConnection aCurrentConnection = scannerForAddingRows.getConnectionPoolForThreads().get(0);
  KuduSession aSessionForInserts = aCurrentConnection.getKuduClient().newSession();
  KuduTable currentTable = aCurrentConnection.getKuduTable();
  long seedValueForTimestampRowKey = 0L; // constant to allow for data landing on first partition for unit tests
  for ( int i = 0; i <= SPLIT_COUNT_FOR_INT_ROW_KEY; i++) { // range key iterator
    int intRowKeyBaseValue = inputrowkeyPartitionEntries[i] + i;
    for ( int k = 0; k < 2; k++) { // hash key iterator . The table defines two hash partitions
      long timestampRowKeyValue = seedValueForTimestampRowKey + k; // to avoid spilling to another tablet
      String stringRowKeyValue = "" + timestampRowKeyValue + k; // to avoid spilling to another tablet randomly
      for ( int y = 0; y < numRowsInEachPartition; y++) {
        Upsert aNewRow = currentTable.newUpsert();
        PartialRow rowValue  = aNewRow.getRow();
        // Start assigning row keys below the current split boundary.
        rowValue.addInt("introwkey",intRowKeyBaseValue - y - 1);
        rowValue.addString("stringrowkey",stringRowKeyValue);
        rowValue.addLong("timestamprowkey",timestampRowKeyValue);
        rowValue.addLong("longdata",(seedValueForTimestampRowKey + y));
        rowValue.addString("stringdata", ("" + seedValueForTimestampRowKey + y));
        OperationResponse response = aSessionForInserts.apply(aNewRow);
      }
    }
  }
  List<OperationResponse> insertResponse = aSessionForInserts.flush();
  aSessionForInserts.close();
  Thread.sleep(2000); // Sleep to allow for scans to complete
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:40,代码来源:KuduInputOperatorCommons.java



注:本文中的org.apache.kudu.client.KuduTable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java CanvasFrame类代码示例发布时间:2022-05-22
下一篇:
Java SchemeRegistry类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap