341 lines
12 KiB
Python
341 lines
12 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright 2012 The Plaso Project Authors.
|
|
# Please see the AUTHORS file for details on individual authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""This file contains the tests for the event storage."""
|
|
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
import unittest
|
|
import zipfile
|
|
|
|
from plaso.engine import queue
|
|
from plaso.events import text_events
|
|
from plaso.events import windows_events
|
|
from plaso.formatters import manager as formatters_manager
|
|
from plaso.lib import event
|
|
from plaso.lib import eventdata
|
|
from plaso.lib import pfilter
|
|
from plaso.lib import storage
|
|
from plaso.lib import timelib_test
|
|
from plaso.multi_processing import multi_process
|
|
from plaso.formatters import winreg # pylint: disable=unused-import
|
|
from plaso.serializer import protobuf_serializer
|
|
|
|
|
|
class DummyObject(object):
|
|
"""Dummy object."""
|
|
|
|
|
|
class GroupMock(object):
|
|
"""Mock a class for grouping events together."""
|
|
def __init__(self):
|
|
self.groups = []
|
|
|
|
def AddGroup(self, name, events, desc=None, first=0, last=0, color=None,
|
|
cat=None):
|
|
"""Add a new group of events."""
|
|
self.groups.append((name, events, desc, first, last, color, cat))
|
|
|
|
def __iter__(self):
|
|
"""Iterator."""
|
|
for name, events, desc, first, last, color, cat in self.groups:
|
|
dummy = DummyObject()
|
|
dummy.name = name
|
|
dummy.events = events
|
|
if desc:
|
|
dummy.description = desc
|
|
if first:
|
|
dummy.first_timestamp = int(first)
|
|
if last:
|
|
dummy.last_timestamp = int(last)
|
|
if color:
|
|
dummy.color = color
|
|
if cat:
|
|
dummy.category = cat
|
|
|
|
yield dummy
|
|
|
|
|
|
class TempDirectory(object):
|
|
"""A self cleaning temporary directory."""
|
|
|
|
def __init__(self):
|
|
"""Initializes the temporary directory."""
|
|
super(TempDirectory, self).__init__()
|
|
self.name = u''
|
|
|
|
def __enter__(self):
|
|
"""Make this work with the 'with' statement."""
|
|
self.name = tempfile.mkdtemp()
|
|
return self.name
|
|
|
|
def __exit__(self, unused_type, unused_value, unused_traceback):
|
|
"""Make this work with the 'with' statement."""
|
|
shutil.rmtree(self.name, True)
|
|
|
|
|
|
class StorageFileTest(unittest.TestCase):
|
|
"""Tests for the plaso storage file."""
|
|
|
|
def setUp(self):
|
|
"""Sets up the needed objects used throughout the test."""
|
|
self._event_objects = []
|
|
|
|
# TODO: replace hardcoded timestamps by timelib_test.CopyStringToTimestamp.
|
|
event_1 = windows_events.WindowsRegistryEvent(
|
|
13349615269295969, u'MY AutoRun key', {u'Value': u'c:/Temp/evil.exe'})
|
|
event_1.parser = 'UNKNOWN'
|
|
|
|
event_2 = windows_events.WindowsRegistryEvent(
|
|
13359662069295961, u'\\HKCU\\Secret\\EvilEmpire\\Malicious_key',
|
|
{u'Value': u'send all the exes to the other world'})
|
|
event_2.parser = 'UNKNOWN'
|
|
|
|
event_3 = windows_events.WindowsRegistryEvent(
|
|
13349402860000000, u'\\HKCU\\Windows\\Normal',
|
|
{u'Value': u'run all the benign stuff'})
|
|
event_3.parser = 'UNKNOWN'
|
|
|
|
text_dict = {'text': (
|
|
'This is a line by someone not reading the log line properly. And '
|
|
'since this log line exceeds the accepted 80 chars it will be '
|
|
'shortened.'), 'hostname': 'nomachine', 'username': 'johndoe'}
|
|
event_4 = text_events.TextEvent(12389344590000000, 12, text_dict)
|
|
event_4.parser = 'UNKNOWN'
|
|
|
|
self._event_objects.append(event_1)
|
|
self._event_objects.append(event_2)
|
|
self._event_objects.append(event_3)
|
|
self._event_objects.append(event_4)
|
|
|
|
def testStorageWriter(self):
|
|
"""Test the storage writer."""
|
|
self.assertEquals(len(self._event_objects), 4)
|
|
|
|
# The storage writer is normally run in a separate thread.
|
|
# For the purpose of this test it has to be run in sequence,
|
|
# hence the call to WriteEventObjects after all the event objects
|
|
# have been queued up.
|
|
|
|
# TODO: add upper queue limit.
|
|
test_queue = multi_process.MultiProcessingQueue()
|
|
test_queue_producer = queue.ItemQueueProducer(test_queue)
|
|
test_queue_producer.ProduceItems(self._event_objects)
|
|
test_queue_producer.SignalEndOfInput()
|
|
|
|
with tempfile.NamedTemporaryFile() as temp_file:
|
|
storage_writer = storage.StorageFileWriter(test_queue, temp_file)
|
|
storage_writer.WriteEventObjects()
|
|
|
|
z_file = zipfile.ZipFile(temp_file, 'r', zipfile.ZIP_DEFLATED)
|
|
|
|
expected_z_filename_list = [
|
|
'plaso_index.000001', 'plaso_meta.000001', 'plaso_proto.000001',
|
|
'plaso_timestamps.000001', 'serializer.txt']
|
|
|
|
z_filename_list = sorted(z_file.namelist())
|
|
self.assertEquals(len(z_filename_list), 5)
|
|
self.assertEquals(z_filename_list, expected_z_filename_list)
|
|
|
|
def testStorage(self):
|
|
"""Test the storage object."""
|
|
event_objects = []
|
|
timestamps = []
|
|
group_mock = GroupMock()
|
|
tags = []
|
|
tags_mock = []
|
|
groups = []
|
|
group_events = []
|
|
same_events = []
|
|
|
|
serializer = protobuf_serializer.ProtobufEventObjectSerializer
|
|
|
|
with TempDirectory() as dirname:
|
|
temp_file = os.path.join(dirname, 'plaso.db')
|
|
store = storage.StorageFile(temp_file)
|
|
store.AddEventObjects(self._event_objects)
|
|
|
|
# Add tagging.
|
|
tag_1 = event.EventTag()
|
|
tag_1.store_index = 0
|
|
tag_1.store_number = 1
|
|
tag_1.comment = 'My comment'
|
|
tag_1.color = 'blue'
|
|
tags_mock.append(tag_1)
|
|
|
|
tag_2 = event.EventTag()
|
|
tag_2.store_index = 1
|
|
tag_2.store_number = 1
|
|
tag_2.tags = ['Malware']
|
|
tag_2.color = 'red'
|
|
tags_mock.append(tag_2)
|
|
|
|
tag_3 = event.EventTag()
|
|
tag_3.store_number = 1
|
|
tag_3.store_index = 2
|
|
tag_3.comment = 'This is interesting'
|
|
tag_3.tags = ['Malware', 'Benign']
|
|
tag_3.color = 'red'
|
|
tags_mock.append(tag_3)
|
|
|
|
store.StoreTagging(tags_mock)
|
|
|
|
# Add additional tagging, second round.
|
|
tag_4 = event.EventTag()
|
|
tag_4.store_index = 1
|
|
tag_4.store_number = 1
|
|
tag_4.tags = ['Interesting']
|
|
|
|
store.StoreTagging([tag_4])
|
|
|
|
group_mock.AddGroup(
|
|
'Malicious', [(1, 1), (1, 2)], desc='Events that are malicious',
|
|
color='red', first=13349402860000000, last=13349615269295969,
|
|
cat='Malware')
|
|
store.StoreGrouping(group_mock)
|
|
store.Close()
|
|
|
|
read_store = storage.StorageFile(temp_file, read_only=True)
|
|
|
|
self.assertTrue(read_store.HasTagging())
|
|
self.assertTrue(read_store.HasGrouping())
|
|
|
|
for event_object in read_store.GetEntries(1):
|
|
event_objects.append(event_object)
|
|
timestamps.append(event_object.timestamp)
|
|
if event_object.data_type == 'windows:registry:key_value':
|
|
self.assertEquals(event_object.timestamp_desc,
|
|
eventdata.EventTimestamp.WRITTEN_TIME)
|
|
else:
|
|
self.assertEquals(event_object.timestamp_desc,
|
|
eventdata.EventTimestamp.WRITTEN_TIME)
|
|
|
|
for tag in read_store.GetTagging():
|
|
event_object = read_store.GetTaggedEvent(tag)
|
|
tags.append(event_object)
|
|
|
|
groups = list(read_store.GetGrouping())
|
|
self.assertEquals(len(groups), 1)
|
|
group_events = list(read_store.GetEventsFromGroup(groups[0]))
|
|
|
|
# Read the same events that were put in the group, just to compare
|
|
# against.
|
|
event_object = read_store.GetEventObject(1, 1)
|
|
serialized_event_object = serializer.WriteSerialized(event_object)
|
|
same_events.append(serialized_event_object)
|
|
|
|
event_object = read_store.GetEventObject(1, 2)
|
|
serialized_event_object = serializer.WriteSerialized(event_object)
|
|
same_events.append(serialized_event_object)
|
|
|
|
self.assertEquals(len(event_objects), 4)
|
|
self.assertEquals(len(tags), 4)
|
|
|
|
self.assertEquals(tags[0].timestamp, 12389344590000000)
|
|
self.assertEquals(tags[0].store_number, 1)
|
|
self.assertEquals(tags[0].store_index, 0)
|
|
self.assertEquals(tags[0].tag.comment, u'My comment')
|
|
self.assertEquals(tags[0].tag.color, u'blue')
|
|
|
|
msg, _ = formatters_manager.EventFormatterManager.GetMessageStrings(tags[0])
|
|
self.assertEquals(msg[0:10], u'This is a ')
|
|
|
|
self.assertEquals(tags[1].tag.tags[0], 'Malware')
|
|
msg, _ = formatters_manager.EventFormatterManager.GetMessageStrings(tags[1])
|
|
self.assertEquals(msg[0:15], u'[\\HKCU\\Windows\\')
|
|
|
|
self.assertEquals(tags[2].tag.comment, u'This is interesting')
|
|
self.assertEquals(tags[2].tag.tags[0], 'Malware')
|
|
self.assertEquals(tags[2].tag.tags[1], 'Benign')
|
|
|
|
self.assertEquals(tags[2].parser, 'UNKNOWN')
|
|
|
|
# Test the newly added fourth tag, which should include data from
|
|
# the first version as well.
|
|
self.assertEquals(tags[3].tag.tags[0], 'Interesting')
|
|
self.assertEquals(tags[3].tag.tags[1], 'Malware')
|
|
|
|
expected_timestamps = [
|
|
12389344590000000, 13349402860000000, 13349615269295969,
|
|
13359662069295961]
|
|
self.assertEquals(timestamps, expected_timestamps)
|
|
|
|
self.assertEquals(groups[0].name, u'Malicious')
|
|
self.assertEquals(groups[0].category, u'Malware')
|
|
self.assertEquals(groups[0].color, u'red')
|
|
self.assertEquals(groups[0].description, u'Events that are malicious')
|
|
self.assertEquals(groups[0].first_timestamp, 13349402860000000)
|
|
self.assertEquals(groups[0].last_timestamp, 13349615269295969)
|
|
|
|
self.assertEquals(len(group_events), 2)
|
|
self.assertEquals(group_events[0].timestamp, 13349402860000000)
|
|
self.assertEquals(group_events[1].timestamp, 13349615269295969L)
|
|
|
|
proto_group_events = []
|
|
for group_event in group_events:
|
|
serialized_event_object = serializer.WriteSerialized(group_event)
|
|
proto_group_events.append(serialized_event_object)
|
|
|
|
self.assertEquals(same_events, proto_group_events)
|
|
|
|
|
|
class StoreStorageTest(unittest.TestCase):
|
|
"""Test sorting storage file,"""
|
|
|
|
def setUp(self):
|
|
"""Setup sets parameters that will be reused throughout this test."""
|
|
# TODO: have sample output generated from the test.
|
|
# TODO: Use input data with a defined year. syslog parser chooses a
|
|
# year based on system clock; forcing updates to test file if regenerated.
|
|
self.test_file = os.path.join('test_data', 'psort_test.out')
|
|
self.first = timelib_test.CopyStringToTimestamp('2012-07-20 15:44:14')
|
|
self.last = timelib_test.CopyStringToTimestamp('2016-11-18 01:15:43')
|
|
|
|
def testStorageSort(self):
|
|
"""This test ensures that items read and output are in the expected order.
|
|
|
|
This method by design outputs data as it runs. In order to test this a
|
|
a modified output renderer is used for which the flush functionality has
|
|
been removed.
|
|
|
|
The test will be to read the TestEventBuffer storage and check to see
|
|
if it matches the known good sort order.
|
|
"""
|
|
pfilter.TimeRangeCache.ResetTimeConstraints()
|
|
pfilter.TimeRangeCache.SetUpperTimestamp(self.last)
|
|
pfilter.TimeRangeCache.SetLowerTimestamp(self.first)
|
|
store = storage.StorageFile(self.test_file, read_only=True)
|
|
|
|
store.store_range = [1, 5, 6]
|
|
|
|
read_list = []
|
|
event_object = store.GetSortedEntry()
|
|
while event_object:
|
|
read_list.append(event_object.timestamp)
|
|
event_object = store.GetSortedEntry()
|
|
|
|
expected_timestamps = [
|
|
1344270407000000L, 1392438730000000L, 1427151678000000L,
|
|
1451584472000000L]
|
|
|
|
self.assertEquals(read_list, expected_timestamps)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|