plaso-rubanetra/plaso/output/dynamic_test.py
2020-04-06 18:48:34 +02:00

132 lines
4.4 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2013 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for plaso.output.l2t_csv."""
import StringIO
import unittest
from plaso.formatters import interface as formatters_interface
from plaso.lib import event
from plaso.lib import eventdata
from plaso.output import dynamic
class TestEvent(event.EventObject):
DATA_TYPE = 'test:dynamic'
def __init__(self):
super(TestEvent, self).__init__()
self.timestamp = 1340821021000000
self.timestamp_desc = eventdata.EventTimestamp.CHANGE_TIME
self.hostname = 'ubuntu'
self.filename = 'log/syslog.1'
self.text = (
u'Reporter <CRON> PID: 8442 (pam_unix(cron:session): session\n '
u'closed for user root)')
class TestEventFormatter(formatters_interface.EventFormatter):
DATA_TYPE = 'test:dynamic'
FORMAT_STRING = u'{text}'
SOURCE_SHORT = 'LOG'
SOURCE_LONG = 'Syslog'
class FakeFilter(object):
"""Provide a fake filter, that defines which fields to use."""
def __init__(self, fields, separator=u','):
self.fields = fields
self.separator = separator
class DynamicTest(unittest.TestCase):
"""Test the dynamic output module."""
def testHeader(self):
output = StringIO.StringIO()
formatter = dynamic.Dynamic(None, output)
correct_line = (
'datetime,timestamp_desc,source,source_long,message,parser,'
'display_name,tag,store_number,store_index\n')
formatter.Start()
self.assertEquals(output.getvalue(), correct_line)
output = StringIO.StringIO()
formatter = dynamic.Dynamic(None, output, filter_use=FakeFilter(
['date', 'time', 'message', 'hostname', 'filename', 'some_stuff']))
correct_line = 'date,time,message,hostname,filename,some_stuff\n'
formatter.Start()
self.assertEquals(output.getvalue(), correct_line)
output = StringIO.StringIO()
formatter = dynamic.Dynamic(None, output, filter_use=FakeFilter(
['date', 'time', 'message', 'hostname', 'filename', 'some_stuff'],
'@'))
correct_line = 'date@time@message@hostname@filename@some_stuff\n'
formatter.Start()
self.assertEquals(output.getvalue(), correct_line)
def testEventBody(self):
"""Test ensures that returned lines returned are fmt CSV as expected."""
event_object = TestEvent()
output = StringIO.StringIO()
formatter = dynamic.Dynamic(None, output, filter_use=FakeFilter(
['date', 'time', 'timezone', 'macb', 'source', 'sourcetype', 'type',
'user', 'host', 'message_short', 'message', 'filename',
'inode', 'notes', 'format', 'extra']))
formatter.Start()
header = (
'date,time,timezone,macb,source,sourcetype,type,user,host,'
'message_short,message,filename,inode,notes,format,extra\n')
self.assertEquals(output.getvalue(), header)
formatter.EventBody(event_object)
correct = (
'2012-06-27,18:17:01,UTC,..C.,LOG,Syslog,Metadata Modification Time,-,'
'ubuntu,Reporter <CRON> PID: 8442 (pam_unix(cron:session): session '
'closed for user root),Reporter <CRON> PID: 8442 '
'(pam_unix(cron:session): session closed for user root),log/syslog.1'
',-,-,-,-\n')
self.assertEquals(output.getvalue(), header + correct)
output = StringIO.StringIO()
formatter = dynamic.Dynamic(None, output, filter_use=FakeFilter(
['datetime', 'nonsense', 'hostname', 'message']))
header = 'datetime,nonsense,hostname,message\n'
formatter.Start()
self.assertEquals(output.getvalue(), header)
correct = (
'2012-06-27T18:17:01+00:00,-,ubuntu,Reporter <CRON> PID: 8442'
' (pam_unix(cron:session): session closed for user root)\n')
formatter.EventBody(event_object)
self.assertEquals(output.getvalue(), header + correct)
if __name__ == '__main__':
unittest.main()