Look for time buckets with abnormally large numbers of messages.
Look for time buckets with abnormally large numbers of messages. Finds and saves to a HDFS file abnormally large time buckets.
Entire code:
// Take the large buckets only val abnormal = buckets.filter(_.length >= threshold) // Examine their sizes val sizes = abnormal.map(_.length).collect() var sum = 0 var i = 0 var largest = 0 var largestIndex = 0 while (i < sizes.length) { println((i + 1) + " " + sizes(i)) sum += sizes(i) if (sizes(i) > largest) { largestIndex = i largest = sizes(i) } i += 1 } println("Total " + sum + " messages in " + i + " buckets, largest bucket " + largest) println("Saving largest bucket(s) of size " + largest + " as text") val largestbuckets = abnormal.filter(_.length >= largest) val strbuckets = largestbuckets.map(x => { var str = ""; for (k <- x) { str += k.mkString(" "); str += "\n" }; str += "\n"; str }) strbuckets.saveAsTextFile(hdfsDir+"largest-buckets")
Usage:
import spark._ import spark.SparkContext._ import spark.timeseries._ /** tbird datafile: */ * val hdfsDir = "hdfs://server.name.org:54310/user/hadoop/" val fileName = "tbird-tagged" /** The Mesos master server to use: */ * val mesosMaster = "mesos://master@host.name.org:5050" /** Initialize Spark and set caching class: */ * val sc = TimeSeriesSpark.init("mesosMaster") /** bucket by hour */ * val buckets = TimeSeriesSpark.bucketLogsByHour(sc, hdfsDir+fileName) val threshold = 100000 //Save abnormally large buckets: SuperComputerLogs.abnormallyLargeBuckets(hdfsDir, buckets, threshold)
The HDFS directory to save the abnormally large time buckets files to.
The BucketRDD whose messages are examined.
the number of messages considered abnormally large.
Filters buckets based on two regexps, .*kernel: ACPI.*
and (?i).*panic.*
,
resulting in messages from the ACPI subsystem and those containing the
word PANIC in lower, upper, or mixed case.
Filters buckets based on two regexps, .*kernel: ACPI.*
and (?i).*panic.*
,
resulting in messages from the ACPI subsystem and those containing the
word PANIC in lower, upper, or mixed case.
Entire code:
import scala.util.matching.Regex val kernelacpi = """.*kernel: ACPI.*""".r val icasePanic = """(?i).*panic.*""".r val matched = buckets.filter(x => x match { case kernelacpi() => { // do more processing here true } case icasePanic() => { println("DEBUG: \"" + x + "\" matches regex.") // do more processing here true } case _ => false }) matched
Usage:
import spark._ import spark.SparkContext._ import spark.timeseries._ /** tbird datafile: */ * val hdfsDir = "hdfs://server.name.org:54310/user/hadoop/" val fileName = "tbird-tagged" /** The Mesos master server to use: */ * val mesosMaster = "mesos://master@host.name.org:5050" /** Initialize Spark and set caching class: */ * val sc = TimeSeriesSpark.init("mesosMaster") /** bucket by hour */ * val buckets = TimeSeriesSpark.bucketLogsByHour(sc, hdfsDir+fileName) // Detect alerts: val alerts = SuperComputerLogs.detectAlerts(buckets) // print only the first one that matches from each hour bucket: for (k <- alerts) { println(k(0).mkString(" ")) }
Not done yet: sophisticated alert detection alert prediction (are there patterns of non-alert messages that tend to precede alerts?)
The BucketRDD whose messages are examined.
Main program entry point.
Main program entry point. Runs Spark with the master args(0)
. Assigns log messages stored in the file args(1)
into buckets by hour. Discards buckets smaller than 100 000 messages.
Saves buckets larger than 100 000 messages to a file
in the same directory as the input file.
args(2)
needs to be specified, but is currently unused. In the future,
it may be used to set the size of a bucket in minutes.
Command line arguments.
Find messages from a particular source.
Find messages from a particular source.
Entire code:
buckets.filter(_(0)(3) == src) })
Usage:
import spark._ import spark.SparkContext._ import spark.timeseries._ /** tbird datafile: */ * val hdfsDir = "hdfs://server.name.org:54310/user/hadoop/" val fileName = "tbird-tagged" /** The Mesos master server to use: */ * val mesosMaster = "mesos://master@host.name.org:5050" /** Initialize Spark and set caching class: */ * val sc = TimeSeriesSpark.init("mesosMaster") /** bucket by hour */ * val buckets = TimeSeriesSpark.bucketLogsByHour(sc, hdfsDir+fileName) // source: val src = "en263" // get the messages from the source val all_from_en263= SuperComputerLogs.messagesWithin(buckets, src).cache() // detect interesting properties in them val interesting = all_from_en263.filter(...) for (k <- interesting){ println(k.mkString(" ") }
The BucketRDD whose messages are examined.
an RDD of Arrays of log messages within the time window.
Find messages within a particular time window.
Find messages within a particular time window.
Entire code:
buckets.filter(x => { val xmonth = x(0)(5) val xdate = x(0)(6) toInt val xhour = x(0)(7).substring(0, 2) toInt dayrange(0) < xdate && dayrange(1) >= xdate && hourrange(0) < xhour && hourrange(1) >= xhour && xmonth == month })
Usage:
import spark._ import spark.SparkContext._ import spark.timeseries._ /** tbird datafile: */ * val hdfsDir = "hdfs://server.name.org:54310/user/hadoop/" val fileName = "tbird-tagged" /** The Mesos master server to use: */ * val mesosMaster = "mesos://master@host.name.org:5050" /** Initialize Spark and set caching class: */ * val sc = TimeSeriesSpark.init("mesosMaster") /** bucket by hour */ * val buckets = TimeSeriesSpark.bucketLogsByHour(sc, hdfsDir+fileName) // get the messages of the time period val dec10to20workinghours = SuperComputerLogs.messagesWithin(buckets, "Dec", Array(10,20), Array(8,17)).cache() // detect interesting properties in them val interesting = dec10to20workinghours.filter(...) for (k <- interesting){ println(k.mkString(" ") }
The BucketRDD whose messages are examined.
the month to find messages in. Use three letter abbreviation, such as Dec
.
the range of days for log messages. Use Array(10, 20)
for example.
the range of hours of log messages. Use Array(8, 17)
for example.
an RDD of Arrays of log messages within the time window.