本文整理汇总了Java中org.apache.flume.serialization.EventSerializer类的典型用法代码示例。如果您正苦于以下问题:Java EventSerializer类的具体用法?Java EventSerializer怎么用?Java EventSerializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
EventSerializer类属于org.apache.flume.serialization包,在下文中一共展示了EventSerializer类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testWithNewline
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Test
public void testWithNewline() throws FileNotFoundException, IOException {
Map<String, String> headers = new HashMap<String, String>();
headers.put("message", "message1");
OutputStream out = new FileOutputStream(testFile);
CustomLastfmHeaderAndBodyTextEventSerializer.Builder builder = CustomLastfmHeaderAndBodyTextEventSerializer.builder();
EventSerializer serializer = builder.build(new Context(), out);
serializer.afterCreate();
serializer.write(EventBuilder.withBody("messageBody", Charsets.UTF_8, headers));
serializer.flush();
serializer.beforeClose();
out.flush();
out.close();
BufferedReader reader = new BufferedReader(new FileReader(testFile));
Assert.assertEquals("message1", reader.readLine());
Assert.assertNull(reader.readLine());
reader.close();
FileUtils.forceDelete(testFile);
}
开发者ID:sequenceiq,项目名称:sequenceiq-samples,代码行数:23,代码来源:CustomLastfmHeaderAndBodyTextEventSerializerTest.java
示例2: testNoNewline
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Test
public void testNoNewline() throws FileNotFoundException, IOException {
Map<String, String> headers = new HashMap<String, String>();
headers.put("header1", "value1");
OutputStream out = new FileOutputStream(testFile);
Context context = new Context();
context.put("appendNewline", "false");
CustomLastfmHeaderAndBodyTextEventSerializer.Builder builder = CustomLastfmHeaderAndBodyTextEventSerializer.builder();
EventSerializer serializer = builder.build(new Context(), out);
serializer.afterCreate();
serializer.write(EventBuilder.withBody("event 1", Charsets.UTF_8, headers));
serializer.write(EventBuilder.withBody("event 2", Charsets.UTF_8, headers));
serializer.write(EventBuilder.withBody("event 3", Charsets.UTF_8, headers));
serializer.flush();
serializer.beforeClose();
out.flush();
out.close();
BufferedReader reader = new BufferedReader(new FileReader(testFile));
Assert.assertNull(reader.readLine());
reader.close();
FileUtils.forceDelete(testFile);
}
开发者ID:sequenceiq,项目名称:sequenceiq-samples,代码行数:27,代码来源:CustomLastfmHeaderAndBodyTextEventSerializerTest.java
示例3: configure
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public void configure(Context context) {
super.configure(context);
serializerType = context.getString("serializer", "TEXT");
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext =
new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:13,代码来源:HDFSDataStream.java
示例4: configure
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public void configure(Context context) {
super.configure(context);
serializerType = context.getString("serializer", "TEXT");
useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
false);
serializerContext = new Context(
context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:13,代码来源:HDFSCompressedDataStream.java
示例5: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
path = context.getString(PATH, PATH_DEFAULT);
customerHeader = context.getString(CUSTOMER_HEADER, CUSTOMER_HEADER_DEFAULT);
hostHeader = context.getString(HOST_HEADER, HOST_HEADER_DEFAULT);
SyslogAvroEventSerializer writer = null;
try {
writer = new SyslogAvroEventSerializer(out, path, customerHeader, hostHeader);
writer.configure(context);
} catch (IOException e) {
log.error("Unable to parse schema file. Exception follows.", e);
}
return writer;
}
开发者ID:DandyDev,项目名称:flume-plugins,代码行数:15,代码来源:SyslogAvroEventSerializer.java
示例6: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
ApacheLogAvroEventSerializer writer = null;
try {
writer = new ApacheLogAvroEventSerializer(out);
writer.configure(context);
} catch (IOException e) {
log.error("Unable to parse schema file. Exception follows.", e);
}
return writer;
}
开发者ID:DandyDev,项目名称:flume-plugins,代码行数:12,代码来源:ApacheLogAvroEventSerializer.java
示例7: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
JavaLogAvroEventSerializer writer = null;
try {
writer = new JavaLogAvroEventSerializer(out);
writer.configure(context);
} catch (IOException e) {
log.error("Unable to parse schema file. Exception follows.", e);
}
return writer;
}
开发者ID:DandyDev,项目名称:flume-plugins,代码行数:12,代码来源:JavaLogAvroEventSerializer.java
示例8: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
FlumeEventAvroEventSerializer writer = new FlumeEventAvroEventSerializer(out);
writer.configure(context);
return writer;
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:7,代码来源:FlumeEventAvroEventSerializer.java
示例9: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
AvroEventSerializer writer = new AvroEventSerializer(out);
writer.configure(context);
return writer;
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:7,代码来源:AvroEventSerializer.java
示例10: createAvroFile
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
public void createAvroFile(File file, String codec, boolean useSchemaUrl,
boolean useStaticSchemaUrl) throws IOException {
// serialize a few events using the reflection-based avro serializer
OutputStream out = new FileOutputStream(file);
Context ctx = new Context();
if (codec != null) {
ctx.put("compressionCodec", codec);
}
Schema schema = Schema.createRecord("myrecord", null, null, false);
schema.setFields(Arrays.asList(new Schema.Field[]{
new Schema.Field("message", Schema.create(Schema.Type.STRING), null, null)
}));
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
File schemaFile = null;
if (useSchemaUrl || useStaticSchemaUrl) {
schemaFile = File.createTempFile(getClass().getSimpleName(), ".avsc");
Files.write(schema.toString(), schemaFile, Charsets.UTF_8);
}
if (useStaticSchemaUrl) {
ctx.put(AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL,
schemaFile.toURI().toURL().toExternalForm());
}
EventSerializer.Builder builder = new AvroEventSerializer.Builder();
EventSerializer serializer = builder.build(ctx, out);
serializer.afterCreate();
for (int i = 0; i < 3; i++) {
GenericRecord record = recordBuilder.set("message", "Hello " + i).build();
Event event = EventBuilder.withBody(serializeAvro(record, schema));
if (schemaFile == null && !useSchemaUrl) {
event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_LITERAL_HEADER,
schema.toString());
} else if (useSchemaUrl) {
event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_URL_HEADER,
schemaFile.toURI().toURL().toExternalForm());
}
serializer.write(event);
}
serializer.flush();
serializer.beforeClose();
out.flush();
out.close();
if (schemaFile != null ) {
schemaFile.delete();
}
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:52,代码来源:TestAvroEventSerializer.java
示例11: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
MessageAvroEventSerializer writer = new MessageAvroEventSerializer(out);
writer.configure(context);
return writer;
}
开发者ID:jcustenborder,项目名称:hadoop-goldengate,代码行数:7,代码来源:MessageAvroEventSerializer.java
示例12: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
return new CustomLastfmHeaderAndBodyTextEventSerializer(out, context);
}
开发者ID:sequenceiq,项目名称:sequenceiq-samples,代码行数:5,代码来源:CustomLastfmHeaderAndBodyTextEventSerializer.java
示例13: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
AvroKaaEventSerializer writer = new AvroKaaEventSerializer(out);
writer.configure(context);
return writer;
}
开发者ID:kaaproject,项目名称:kaa,代码行数:7,代码来源:AvroKaaEventSerializer.java
示例14: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
CSVAvroSerializer ser = new CSVAvroSerializer(out);
ser.configure(context);
return ser;
}
开发者ID:mpercy,项目名称:flume-rtq-hadoop-summit-2013,代码行数:7,代码来源:CSVAvroSerializer.java
示例15: build
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Override
public EventSerializer build(Context context, OutputStream out) {
FlumeEventStringAvroEventSerializer writer = new FlumeEventStringAvroEventSerializer(out);
writer.configure(context);
return writer;
}
开发者ID:DandyDev,项目名称:flume-plugins,代码行数:7,代码来源:FlumeEventStringAvroEventSerializer.java
示例16: test
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Test
public void test() throws FileNotFoundException, IOException {
// create the file, write some data
OutputStream out = new FileOutputStream(testFile);
String builderName = JavaLogAvroEventSerializer.Builder.class.getName();
Context ctx = new Context();
ctx.put("syncInterval", "4096");
EventSerializer serializer =
EventSerializerFactory.getInstance(builderName, ctx, out);
serializer.afterCreate(); // must call this when a file is newly created
List<Event> events = generateJavaEvents();
for (Event e : events) {
serializer.write(e);
}
serializer.flush();
serializer.beforeClose();
out.flush();
out.close();
// now try to read the file back
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> fileReader =
new DataFileReader<GenericRecord>(testFile, reader);
GenericRecord record = new GenericData.Record(fileReader.getSchema());
int numEvents = 0;
while (fileReader.hasNext()) {
fileReader.next(record);
long timestamp = (Long) record.get("timestamp");
String datetime = record.get("datetime").toString();
String classname = record.get("classname").toString();
String message = record.get("message").toString();
System.out.println(classname + ": " + message + " (at " + datetime + ")");
numEvents++;
}
fileReader.close();
Assert.assertEquals("Should have found a total of 4 events", 4, numEvents);
FileUtils.forceDelete(testFile);
}
开发者ID:DandyDev,项目名称:flume-plugins,代码行数:48,代码来源:TestJavaAvroEventSerializer.java
示例17: test
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Test
public void test() throws FileNotFoundException, IOException {
// create the file, write some data
OutputStream out = new FileOutputStream(testFile);
String builderName = SyslogAvroEventSerializer.Builder.class.getName();
Context ctx = new Context();
ctx.put("syncInterval", "4096");
ctx.put("path", "src/test/resources/customerToHostsFile.txt");
EventSerializer serializer =
EventSerializerFactory.getInstance(builderName, ctx, out);
serializer.afterCreate(); // must call this when a file is newly created
List<Event> events = generateSyslogEvents();
for (Event e : events) {
serializer.write(e);
}
serializer.flush();
serializer.beforeClose();
out.flush();
out.close();
// now try to read the file back
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> fileReader =
new DataFileReader<GenericRecord>(testFile, reader);
GenericRecord record = new GenericData.Record(fileReader.getSchema());
int numEvents = 0;
while (fileReader.hasNext()) {
fileReader.next(record);
long timestamp = (Long) record.get("timestamp");
String datetime = record.get("datetime").toString();
String hostname = record.get("hostname").toString();
Map<String, String> headers = (Map<String, String>) record.get("headers");
String message = record.get("message").toString();
System.out.println(hostname + " (" + headers + ")" + ": " + message);
numEvents++;
}
fileReader.close();
Assert.assertEquals("Should have found a total of 6 events", 6, numEvents);
FileUtils.forceDelete(testFile);
}
开发者ID:DandyDev,项目名称:flume-plugins,代码行数:50,代码来源:TestSyslogAvroEventSerializer.java
示例18: test
import org.apache.flume.serialization.EventSerializer; //导入依赖的package包/类
@Test
public void test() throws FileNotFoundException, IOException {
// create the file, write some data
OutputStream out = new FileOutputStream(testFile);
String builderName = ApacheLogAvroEventSerializer.Builder.class.getName();
Context ctx = new Context();
ctx.put("syncInterval", "4096");
EventSerializer serializer =
EventSerializerFactory.getInstance(builderName, ctx, out);
serializer.afterCreate(); // must call this when a file is newly created
List<Event> events = generateApacheEvents();
for (Event e : events) {
serializer.write(e);
}
serializer.flush();
serializer.beforeClose();
out.flush();
out.close();
// now try to read the file back
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> fileReader =
new DataFileReader<GenericRecord>(testFile, reader);
GenericRecord record = new GenericData.Record(fileReader.getSchema());
int numEvents = 0;
while (fileReader.hasNext()) {
fileReader.next(record);
String ip = record.get("ip").toString();
String uri = record.get("uri").toString();
Integer statuscode = (Integer) record.get("statuscode");
String original = record.get("original").toString();
String connectionstatus = record.get("connectionstatus").toString();
Assert.assertEquals("Ip should be 80.79.194.3", "80.79.194.3", ip);
System.out.println("IP " + ip + " requested: " + uri + " with status code " + statuscode + " and connectionstatus: " + connectionstatus);
System.out.println("Original logline: " + original);
numEvents++;
}
fileReader.close();
Assert.assertEquals("Should have found a total of 3 events", 2, numEvents);
FileUtils.forceDelete(testFile);
}
开发者ID:DandyDev,项目名称:flume-plugins,代码行数:51,代码来源:TestApacheAvroEventSerializer.java
注:本文中的org.apache.flume.serialization.EventSerializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论