本文整理汇总了Java中org.embulk.spi.util.Pages类的典型用法代码示例。如果您正苦于以下问题:Java Pages类的具体用法?Java Pages怎么用?Java Pages使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Pages类属于org.embulk.spi.util包,在下文中一共展示了Pages类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: skipRecords
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void skipRecords()
throws Exception
{
SchemaConfig schema = schema(
column("_c0", BOOLEAN), column("_c1", LONG), column("_c2", DOUBLE),
column("_c3", STRING), column("_c4", TIMESTAMP), column("_c5", JSON));
ConfigSource config = this.config.deepCopy().set("columns", schema);
transaction(config, fileInput(
"[",
"[]",
"\"embulk\"",
"10",
"true",
"false",
"null",
" ",
"]"
));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(0, records.size());
}
开发者ID:hiroyuki-sato,项目名称:embulk-parser-jsonpath,代码行数:25,代码来源:TestJsonpathParserPlugin.java
示例2: booleanStrings
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void booleanStrings()
throws Exception
{
SchemaConfig schema = schema(column("_c1", BOOLEAN), column("_c2", BOOLEAN),
column("_c3", BOOLEAN), column("_c4", BOOLEAN), column("_c5", BOOLEAN),
column("_c6", BOOLEAN), column("_c7", BOOLEAN), column("_c8", BOOLEAN),
column("_c9", BOOLEAN), column("_c10", BOOLEAN), column("_c11", BOOLEAN),
column("_c12", BOOLEAN));
ConfigSource config = this.config.deepCopy().set("columns", schema);
transaction(config, fileInput("[{\"_c1\" : \"yes\", \"_c2\" : \"true\", \"_c3\" : \"1\",",
"\"_c4\" : \"on\", \"_c5\" : \"y\", \"_c6\" : \"t\",",
"\"_c7\" : \"no\", \"_c8\" : \"false\", \"_c9\" : \"0\"," ,
"\"_c10\" : \"off\", \"_c11\" : \"n\", \"_c12\" : \"f\"}]"));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(1, records.size());
Object[] record = records.get(0);
for (int i = 0; i < 5; i++) {
assertTrue((boolean) record[i]);
}
for (int i = 6; i < 11; i++) {
assertFalse((boolean) record[i]);
}
}
开发者ID:hiroyuki-sato,项目名称:embulk-parser-jsonpath,代码行数:27,代码来源:TestJsonpathParserPlugin.java
示例3: filter
import org.embulk.spi.util.Pages; //导入依赖的package包/类
private List<Object[]> filter(PluginTask task, Schema inputSchema, Object ... objects)
{
TestPageBuilderReader.MockPageOutput output = new TestPageBuilderReader.MockPageOutput();
Schema outputSchema = BuildJsonFilterPlugin.buildOutputSchema(task, inputSchema);
PageBuilder pageBuilder = new PageBuilder(runtime.getBufferAllocator(), outputSchema, output);
PageReader pageReader = new PageReader(inputSchema);
BuildJsonVisitorImpl visitor = new BuildJsonVisitorImpl(task, inputSchema, outputSchema, pageReader, pageBuilder);
List<Page> pages = PageTestUtils.buildPage(runtime.getBufferAllocator(), inputSchema, objects);
for (Page page : pages) {
pageReader.setPage(page);
while (pageReader.nextRecord()) {
outputSchema.visitColumns(visitor);
pageBuilder.addRecord();
}
}
pageBuilder.finish();
pageBuilder.close();
return Pages.toObjects(outputSchema, output.pages);
}
开发者ID:hiroyuki-sato,项目名称:embulk-filter-build_json,代码行数:22,代码来源:TestBuildJsonVisitorImpl.java
示例4: filter
import org.embulk.spi.util.Pages; //导入依赖的package包/类
private List<Object[]> filter(PluginTask task, Schema inputSchema, Object ... objects)
{
MockPageOutput output = new MockPageOutput();
Schema outputSchema = plugin.buildOutputSchema(task, inputSchema);
PageBuilder pageBuilder = new PageBuilder(runtime.getBufferAllocator(), outputSchema, output);
PageReader pageReader = new PageReader(inputSchema);
final Map<String, List<Base58FilterPlugin.Base58Column>> base58ColumnMap = Base58FilterPlugin.convertBase58ColumnListToMap(task.getColumns());
final Map<String, Column> outputColumnMap = Base58FilterPlugin.convertColumnListToMap(outputSchema.getColumns(), null);
List<Page> pages = PageTestUtils.buildPage(runtime.getBufferAllocator(), inputSchema, objects);
for (Page page : pages) {
pageReader.setPage(page);
while (pageReader.nextRecord()) {
plugin.setValue(base58ColumnMap, outputColumnMap, pageReader, outputSchema, pageBuilder);
pageBuilder.addRecord();
}
}
pageBuilder.finish();
pageBuilder.close();
return Pages.toObjects(outputSchema, output.pages);
}
开发者ID:kfitzgerald,项目名称:embulk-filter-base58,代码行数:23,代码来源:TestBase58FilterImpl.java
示例5: filter
import org.embulk.spi.util.Pages; //导入依赖的package包/类
private List<Object[]> filter(PluginTask task, Schema inputSchema, Object ... objects)
{
MockPageOutput output = new MockPageOutput();
Schema outputSchema = ColumnFilterPlugin.buildOutputSchema(task, inputSchema);
PageBuilder pageBuilder = new PageBuilder(runtime.getBufferAllocator(), outputSchema, output);
PageReader pageReader = new PageReader(inputSchema);
ColumnVisitorImpl visitor = new ColumnVisitorImpl(task, inputSchema, outputSchema, pageReader, pageBuilder);
List<Page> pages = PageTestUtils.buildPage(runtime.getBufferAllocator(), inputSchema, objects);
for (Page page : pages) {
pageReader.setPage(page);
while (pageReader.nextRecord()) {
outputSchema.visitColumns(visitor);
pageBuilder.addRecord();
}
}
pageBuilder.finish();
pageBuilder.close();
return Pages.toObjects(outputSchema, output.pages);
}
开发者ID:sonots,项目名称:embulk-filter-column,代码行数:22,代码来源:TestColumnVisitorImpl.java
示例6: skipRecords
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void skipRecords()
throws Exception
{
SchemaConfig schema = schema(
column("_c0", BOOLEAN), column("_c1", LONG), column("_c2", DOUBLE),
column("_c3", STRING), column("_c4", TIMESTAMP), column("_c5", JSON));
ConfigSource config = this.config.deepCopy().set("columns", schema);
transaction(config, fileInput(
"[]",
"\"embulk\"",
"10",
"true",
"false",
"null",
" "
));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(0, records.size());
}
开发者ID:shun0102,项目名称:embulk-parser-jsonl,代码行数:23,代码来源:TestJsonlParserPlugin.java
示例7: writeNils
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void writeNils()
throws Exception
{
SchemaConfig schema = schema(
column("_c0", BOOLEAN), column("_c1", LONG), column("_c2", DOUBLE),
column("_c3", STRING), column("_c4", TIMESTAMP), column("_c5", JSON));
ConfigSource config = this.config.deepCopy().set("columns", schema);
transaction(config, fileInput(
"{}",
"{\"_c0\":null,\"_c1\":null,\"_c2\":null}",
"{\"_c3\":null,\"_c4\":null,\"_c5\":null}",
"{}"
));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(4, records.size());
for (Object[] record : records) {
for (int i = 0; i < 6; i++) {
assertNull(record[i]);
}
}
}
开发者ID:shun0102,项目名称:embulk-parser-jsonl,代码行数:26,代码来源:TestJsonlParserPlugin.java
示例8: useNoColumnsOption
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void useNoColumnsOption()
throws Exception
{
SchemaConfig schema = schema(
column("id", LONG),
column("code", LONG),
column("name", STRING),
column("description", STRING),
column("flag", BOOLEAN),
column("created_at", STRING),
column("created_at_utc", DOUBLE),
column("price", DOUBLE),
column("spec", JSON),
column("tags", JSON),
column("options", JSON),
column("item_type", STRING),
column("dummy", STRING)
);
ConfigSource config = this.config.deepCopy().set("avsc", this.getClass().getResource("item.avsc").getPath());
transaction(config, fileInput(new File(this.getClass().getResource("items.avro").getPath())));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(6, records.size());
Object[] record = records.get(0);
assertEquals(1L, record[0]);
assertEquals(123456789012345678L, record[1]);
assertEquals("Desktop", record[2]);
assertEquals(true, record[4]);
assertEquals("D", record[11]);
assertEquals("[\"tag1\",\"tag2\"]", record[9].toString());
assertEquals("bar", ((MapValue)record[10]).map().get(ValueFactory.newString("foo")).toString());
assertEquals("opt1", ((MapValue)record[8]).map().get(ValueFactory.newString("key")).toString());
assertEquals("2016-05-09T04:35:43+09:00", record[5].toString());
assertNull(record[12]);
}
开发者ID:joker1007,项目名称:embulk-parser-avro,代码行数:40,代码来源:TestAvroParserPlugin.java
示例9: skipBrokenJson
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void skipBrokenJson()
throws Exception
{
SchemaConfig schema = schema(
column("_c0", BOOLEAN), column("_c1", LONG), column("_c2", DOUBLE),
column("_c3", STRING), column("_c4", TIMESTAMP), column("_c5", JSON));
ConfigSource config = this.config.deepCopy().set("columns", schema);
transaction(config, fileInput("BROKEN"));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(0, records.size());
}
开发者ID:hiroyuki-sato,项目名称:embulk-parser-jsonpath,代码行数:14,代码来源:TestJsonpathParserPlugin.java
示例10: skipBrokenColumn
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void skipBrokenColumn()
throws Exception
{
SchemaConfig schema = schema(column("_c1", TIMESTAMP));
ConfigSource config = this.config.deepCopy().set("columns", schema).
set("stop_on_invalid_record", false);
transaction(config, fileInput("{\"_c1\" : \"INVALID\"}"));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(0, records.size());
}
开发者ID:hiroyuki-sato,项目名称:embulk-parser-jsonpath,代码行数:13,代码来源:TestJsonpathParserPlugin.java
示例11: writeNils
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void writeNils()
throws Exception
{
SchemaConfig schema = schema(
column("_c0", BOOLEAN), column("_c1", LONG), column("_c2", DOUBLE),
column("_c3", STRING), column("_c4", TIMESTAMP), column("_c5", JSON));
ConfigSource config = this.config.deepCopy().set("columns", schema);
transaction(config, fileInput(
"[",
"{},",
"{\"_c0\":null,\"_c1\":null,\"_c2\":null},",
"{\"_c3\":null,\"_c4\":null,\"_c5\":null},",
"{}",
"]"
));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(4, records.size());
for (Object[] record : records) {
for (int i = 0; i < 6; i++) {
assertNull(record[i]);
}
}
}
开发者ID:hiroyuki-sato,项目名称:embulk-parser-jsonpath,代码行数:28,代码来源:TestJsonpathParserPlugin.java
示例12: useColumnOptions
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void useColumnOptions()
throws Exception
{
SchemaConfig schema = schema(
column("_c0", BOOLEAN), column("_c1", LONG), column("_c2", DOUBLE));
File yamlFile = getResourceFile("use_column_options.yml");
ConfigSource config = getConfigFromYamlFile(yamlFile);
transaction(config, fileInput(
"{\"_c0\":\"true\",\"_c1\":\"10\",\"_c2\":\"0.1\"}",
"{\"_c0\":\"false\",\"_c1\":\"-10\",\"_c2\":\"1.0\"}"
));
List<Object[]> records = Pages.toObjects(schema.toSchema(), output.pages);
assertEquals(2, records.size());
Object[] record;
{
record = records.get(0);
assertEquals(true, record[0]);
assertEquals(10L, record[1]);
assertEquals(0.1, (Double) record[2], 0.0001);
}
{
record = records.get(1);
assertEquals(false, record[0]);
assertEquals(-10L, record[1]);
assertEquals(1.0, (Double) record[2], 0.0001);
}
}
开发者ID:shun0102,项目名称:embulk-parser-jsonl,代码行数:33,代码来源:TestJsonlParserPlugin.java
示例13: testExpandSpecialJsonValuesFromString
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void testExpandSpecialJsonValuesFromString()
{
final String configYaml = "" +
"type: expand_json\n" +
"json_column_name: _c1\n" +
"root: $.\n" +
"expanded_columns:\n" +
" - {name: _e0, type: string}\n" +
" - {name: _e1, type: string}\n"; // the value will be null
ConfigSource config = getConfigFromYaml(configYaml);
final Schema schema = schema("_c0", STRING, "_c1", STRING);
expandJsonFilterPlugin.transaction(config, schema, new Control()
{
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput mockPageOutput = new MockPageOutput();
try (PageOutput pageOutput = expandJsonFilterPlugin.open(taskSource, schema, outputSchema, mockPageOutput)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
"_v0", "")) {
pageOutput.add(page);
}
pageOutput.finish();
}
for (Object[] record : Pages.toObjects(outputSchema, mockPageOutput.pages)) {
assertEquals("_v0", record[0]);
assertNull(record[1]);
assertNull(record[2]);
}
}
});
}
开发者ID:civitaspo,项目名称:embulk-filter-expand_json,代码行数:39,代码来源:TestExpandJsonFilterPlugin.java
示例14: testUnchangedColumnValues
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void testUnchangedColumnValues()
{
String configYaml = "" +
"type: expand_json\n" +
"json_column_name: _c6\n" +
"root: $.\n" +
"expanded_columns:\n" +
" - {name: _e0, type: string}\n";
final ConfigSource config = getConfigFromYaml(configYaml);
final Schema schema = schema("_c0", STRING, "_c1", BOOLEAN, "_c2", DOUBLE,
"_c3", LONG, "_c4", TIMESTAMP, "_c5", JSON, "_c6", STRING);
expandJsonFilterPlugin.transaction(config, schema, new Control()
{
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput mockPageOutput = new MockPageOutput();
try (PageOutput pageOutput = expandJsonFilterPlugin.open(taskSource, schema, outputSchema, mockPageOutput)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
"_v0", // _c0
true, // _c1
0.2, // _c2
3L, // _c3
Timestamp.ofEpochSecond(4), // _c4
newMapBuilder().put(s("_e0"), s("_v5")).build(), // _c5
"{\"_e0\":\"_v6\"}")) {
pageOutput.add(page);
}
pageOutput.finish();
}
List<Object[]> records = Pages.toObjects(outputSchema, mockPageOutput.pages);
assertEquals(1, records.size());
Object[] record = records.get(0);
assertEquals("_v0", record[0]);
assertEquals(true, record[1]);
assertEquals(0.2, (double) record[2], 0.0001);
assertEquals(3L, record[3]);
assertEquals(Timestamp.ofEpochSecond(4), record[4]);
assertEquals(newMapBuilder().put(s("_e0"), s("_v5")).build(), record[5]);
}
});
}
开发者ID:civitaspo,项目名称:embulk-filter-expand_json,代码行数:49,代码来源:TestExpandJsonFilterPlugin.java
示例15: testExpandedJsonValuesWithKeepJsonColumns
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void testExpandedJsonValuesWithKeepJsonColumns()
{
final String configYaml = "" +
"type: expand_json\n" +
"json_column_name: _c1\n" +
"root: $.\n" +
"expanded_columns:\n" +
" - {name: _e0, type: string}\n" +
"keep_expanding_json_column: true\n";
ConfigSource config = getConfigFromYaml(configYaml);
final Schema schema = schema("_c0", STRING, "_c1", STRING);
expandJsonFilterPlugin.transaction(config, schema, new Control()
{
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput mockPageOutput = new MockPageOutput();
try (PageOutput pageOutput = expandJsonFilterPlugin.open(taskSource, schema, outputSchema, mockPageOutput)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
"_v0", "{\"_e0\":\"_ev0\"}")) {
pageOutput.add(page);
}
pageOutput.finish();
}
assertEquals(3, outputSchema.getColumnCount());
Column column;
{ // 1st column
column = outputSchema.getColumn(0);
assertTrue(column.getName().equals("_c0") && column.getType().equals(STRING));
}
{ // 2nd column
column = outputSchema.getColumn(1);
assertTrue(column.getName().equals("_c1") && column.getType().equals(STRING));
}
{ // 3rd column
column = outputSchema.getColumn(2);
assertTrue(column.getName().equals("_e0") && column.getType().equals(STRING));
}
for (Object[] record : Pages.toObjects(outputSchema, mockPageOutput.pages)) {
assertEquals("_v0", record[0]);
assertEquals("{\"_e0\":\"_ev0\"}", record[1]);
assertEquals("_ev0", record[2]);
}
}
});
}
开发者ID:civitaspo,项目名称:embulk-filter-expand_json,代码行数:54,代码来源:TestExpandJsonFilterPlugin.java
示例16: getRecords
import org.embulk.spi.util.Pages; //导入依赖的package包/类
private List<Object[]> getRecords(ConfigSource config, MockPageOutput output)
{
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
return Pages.toObjects(schema, output.pages);
}
开发者ID:civitaspo,项目名称:embulk-input-hdfs,代码行数:6,代码来源:TestHdfsFileInputPlugin.java
示例17: testDistinctBySingleColumn
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void testDistinctBySingleColumn()
{
String yaml = "" +
"type: distinct\n" +
"columns: [_c0]\n";
ConfigSource config = loadConfigFromYaml(yaml);
plugin.transaction(config, schema, new Control() {
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput output = new MockPageOutput();
try (PageOutput pageOutput = plugin.open(taskSource, schema, outputSchema, output)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
"a", "a", // row: 1
"a", "a", // row: 2
"a", "b", // row: 3
"b", "b", // row: 4
"b", "a", // row: 5
"b", "b", // row: 6
null, "a", // row: 7
null, "b" // row: 8
)
) {
pageOutput.add(page);
}
pageOutput.finish();
}
List<Object[]> records = Pages.toObjects(outputSchema, output.pages);
assertEquals(3, records.size());
Object[] record1 = records.get(0);
assertEquals("a", record1[0]);
assertEquals("a", record1[1]);
Object[] record2 = records.get(1);
assertEquals("b", record2[0]);
assertEquals("b", record2[1]);
Object[] record3 = records.get(2);
assertNull(record3[0]);
assertEquals("a", record3[1]);
}
});
}
开发者ID:civitaspo,项目名称:embulk-filter-distinct,代码行数:49,代码来源:TestDistinctFilterPlugin.java
示例18: testDistinctByMultipleColumns
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void testDistinctByMultipleColumns()
{
String yaml = "" +
"type: distinct\n" +
"columns: [_c0, _c1]\n";
ConfigSource config = loadConfigFromYaml(yaml);
plugin.transaction(config, schema, new Control() {
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput output = new MockPageOutput();
try (PageOutput pageOutput = plugin.open(taskSource, schema, outputSchema, output)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
"a", "a", // row: 1
"a", "a", // row: 2
"a", "b", // row: 3
"b", "b", // row: 4
"b", "a", // row: 5
"b", "b", // row: 6
null, "a", // row: 7
null, "b" // row: 8
)
) {
pageOutput.add(page);
}
pageOutput.finish();
}
List<Object[]> records = Pages.toObjects(outputSchema, output.pages);
assertEquals(6, records.size());
Object[] record1 = records.get(0);
assertEquals("a", record1[0]);
assertEquals("a", record1[1]);
Object[] record2 = records.get(1);
assertEquals("a", record2[0]);
assertEquals("b", record2[1]);
Object[] record3 = records.get(2);
assertEquals("b", record3[0]);
assertEquals("b", record3[1]);
Object[] record4 = records.get(3);
assertEquals("b", record4[0]);
assertEquals("a", record4[1]);
Object[] record5 = records.get(4);
assertNull(record5[0]);
assertEquals("a", record5[1]);
Object[] record6 = records.get(5);
assertNull(record6[0]);
assertEquals("b", record6[1]);
}
});
}
开发者ID:civitaspo,项目名称:embulk-filter-distinct,代码行数:61,代码来源:TestDistinctFilterPlugin.java
示例19: testDistinctByLongColumn
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void testDistinctByLongColumn()
{
schema = schema("_c0", Types.LONG, "_c1", Types.STRING);
String yaml = "" +
"type: distinct\n" +
"columns: [_c0]\n";
ConfigSource config = loadConfigFromYaml(yaml);
plugin.transaction(config, schema, new Control() {
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput output = new MockPageOutput();
try (PageOutput pageOutput = plugin.open(taskSource, schema, outputSchema, output)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
1L, "a", // row: 1
1L, "a", // row: 2
1L, "b", // row: 3
2L, "b", // row: 4
2L, "a", // row: 5
2L, "b", // row: 6
null, "a", // row: 7
null, "b" // row: 8
)
) {
pageOutput.add(page);
}
pageOutput.finish();
}
List<Object[]> records = Pages.toObjects(outputSchema, output.pages);
assertEquals(3, records.size());
Object[] record1 = records.get(0);
assertEquals(1L, record1[0]);
assertEquals("a", record1[1]);
Object[] record2 = records.get(1);
assertEquals(2L, record2[0]);
assertEquals("b", record2[1]);
Object[] record3 = records.get(2);
assertNull(record3[0]);
assertEquals("a", record3[1]);
}
});
}
开发者ID:civitaspo,项目名称:embulk-filter-distinct,代码行数:50,代码来源:TestDistinctFilterPlugin.java
示例20: testDistinctByDoubleColumn
import org.embulk.spi.util.Pages; //导入依赖的package包/类
@Test
public void testDistinctByDoubleColumn()
{
schema = schema("_c0", Types.DOUBLE, "_c1", Types.STRING);
String yaml = "" +
"type: distinct\n" +
"columns: [_c0]\n";
ConfigSource config = loadConfigFromYaml(yaml);
plugin.transaction(config, schema, new Control() {
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput output = new MockPageOutput();
try (PageOutput pageOutput = plugin.open(taskSource, schema, outputSchema, output)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
1.1, "a", // row: 1
1.1, "a", // row: 2
1.1, "b", // row: 3
2.2, "b", // row: 4
2.2, "a", // row: 5
2.2, "b", // row: 6
null, "a", // row: 7
null, "b" // row: 8
)
) {
pageOutput.add(page);
}
pageOutput.finish();
}
List<Object[]> records = Pages.toObjects(outputSchema, output.pages);
assertEquals(3, records.size());
Object[] record1 = records.get(0);
assertEquals(1.1, record1[0]);
assertEquals("a", record1[1]);
Object[] record2 = records.get(1);
assertEquals(2.2, record2[0]);
assertEquals("b", record2[1]);
Object[] record3 = records.get(2);
assertNull(record3[0]);
assertEquals("a", record3[1]);
}
});
}
开发者ID:civitaspo,项目名称:embulk-filter-distinct,代码行数:50,代码来源:TestDistinctFilterPlugin.java
注:本文中的org.embulk.spi.util.Pages类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论