diff --git a/Dream2/.classpath b/Dream2/.classpath index 03be8b6..7eb4300 100644 --- a/Dream2/.classpath +++ b/Dream2/.classpath @@ -14,6 +14,7 @@ + diff --git a/Dream2/dream.properties b/Dream2/dream.properties index 1625048..365f84f 100755 --- a/Dream2/dream.properties +++ b/Dream2/dream.properties @@ -1,4 +1,5 @@ serverAddr localhost serverPort 9000 hostName local -consistencyType single_glitch_free \ No newline at end of file +consistencyType single_glitch_free +enableEvaluation true \ No newline at end of file diff --git a/Dream2/eval/ec2_scripts/distribute.sh b/Dream2/eval/ec2_scripts/distribute.sh new file mode 100644 index 0000000..e09e750 --- /dev/null +++ b/Dream2/eval/ec2_scripts/distribute.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +source nodes.sh + +for i in "${nodes[@]}" +do + echo 'Sending files to '$i + scp -r $1 $i: +done diff --git a/Dream2/eval/ec2_scripts/gather.sh b/Dream2/eval/ec2_scripts/gather.sh new file mode 100644 index 0000000..96a6449 --- /dev/null +++ b/Dream2/eval/ec2_scripts/gather.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +source nodes.sh + +for i in "${nodes[@]}" +do + echo 'Gathering files from '$i + scp $i:*.txt ./results/ +done diff --git a/Dream2/eval/ec2_scripts/killall.sh b/Dream2/eval/ec2_scripts/killall.sh new file mode 100644 index 0000000..bb41a2f --- /dev/null +++ b/Dream2/eval/ec2_scripts/killall.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +source nodes.sh + +for i in "${nodes[@]}" +do + echo 'Killing java processes in '$i + ssh $i killall -9 java +done diff --git a/Dream2/eval/ec2_scripts/nodes.sh b/Dream2/eval/ec2_scripts/nodes.sh new file mode 100644 index 0000000..8c76607 --- /dev/null +++ b/Dream2/eval/ec2_scripts/nodes.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +broker1=ip-172-31-43-217.eu-central-1.compute.internal +broker2=ip-172-31-47-82.eu-central-1.compute.internal +broker3=ip-172-31-41-23.eu-central-1.compute.internal +broker4=ip-172-31-32-245.eu-central-1.compute.internal +broker5=ip-172-31-34-178.eu-central-1.compute.internal +broker6=ip-172-31-39-44.eu-central-1.compute.internal +broker7=ip-172-31-44-11.eu-central-1.compute.internal +broker8=ip-172-31-47-133.eu-central-1.compute.internal +broker9=ip-172-31-44-19.eu-central-1.compute.internal +broker10=ip-172-31-42-224.eu-central-1.compute.internal + +lock_manager=ip-172-31-39-95.eu-central-1.compute.internal + +client=ip-172-31-43-235.eu-central-1.compute.internal + +nodes=($broker1 $broker2 $broker3 $broker4 $broker5 $broker6 $broker7 $broker8 $broker9 $broker10 $lock_manager $client) diff --git a/Dream2/eval/ec2_scripts/start_local.sh b/Dream2/eval/ec2_scripts/start_local.sh new file mode 100644 index 0000000..9286310 --- /dev/null +++ b/Dream2/eval/ec2_scripts/start_local.sh @@ -0,0 +1,13 @@ +java -cp DREAM.jar dream.eval.StartLockManager lock-manager 9999 & +sleep 1 +java -cp DREAM.jar dream.eval.StartServer server1 9001 & +sleep 1 +java -cp DREAM.jar dream.eval.StartServer server2 9002 reds-tcp:localhost:9001 & +sleep 1 +java -cp DREAM.jar dream.eval.EvalVarClient reds-tcp:localhost:9001 reds-tcp:localhost:9999 client1 v1 10000 500 & +sleep 1 +java -cp DREAM.jar dream.eval.EvalVarClient reds-tcp:localhost:9002 reds-tcp:localhost:9999 client2 v2 10000 500 & +sleep 1 +java -cp DREAM.jar dream.eval.EvalSignalClient reds-tcp:localhost:9001 reds-tcp:localhost:9999 client3 v3 v1@client1:v2@client2 & +sleep 1 +java -cp DREAM.jar dream.eval.EvalSignalClient reds-tcp:localhost:9002 reds-tcp:localhost:9999 client4 v4 v3@client3 & diff --git a/Dream2/eval/ec2_scripts/start_remote.sh b/Dream2/eval/ec2_scripts/start_remote.sh new file mode 100644 index 0000000..8471757 --- /dev/null +++ b/Dream2/eval/ec2_scripts/start_remote.sh @@ -0,0 +1,156 @@ +#!/bin/bash + +source nodes.sh + +#j=java +j=./jdk1.8.0_144/jre/bin/java + +reds_lock_manager='reds-tcp:'$lock_manager':9999' +reds_broker1='reds-tcp:'$broker1':9000' +reds_broker2='reds-tcp:'$broker2':9000' +reds_broker3='reds-tcp:'$broker3':9000' +reds_broker4='reds-tcp:'$broker4':9000' +reds_broker5='reds-tcp:'$broker5':9000' +reds_broker6='reds-tcp:'$broker6':9000' +reds_broker7='reds-tcp:'$broker7':9000' +reds_broker8='reds-tcp:'$broker8':9000' +reds_broker9='reds-tcp:'$broker9':9000' +reds_broker10='reds-tcp:'$broker10':9000' + +ssh $lock_manager $j -cp DREAM.jar dream.eval.StartLockManager lock-manager 9999 & +sleep 1 + +ssh $broker1 $j -cp DREAM.jar dream.eval.StartServer broker1 9000 & +sleep 1 +ssh $broker2 $j -cp DREAM.jar dream.eval.StartServer broker2 9000 $reds_broker1 & +sleep 1 +ssh $broker3 $j -cp DREAM.jar dream.eval.StartServer broker3 9000 $reds_broker1 & +sleep 1 +ssh $broker4 $j -cp DREAM.jar dream.eval.StartServer broker4 9000 $reds_broker1 & +sleep 1 +ssh $broker5 $j -cp DREAM.jar dream.eval.StartServer broker5 9000 $reds_broker1 & +sleep 1 +ssh $broker6 $j -cp DREAM.jar dream.eval.StartServer broker6 9000 $reds_broker1 & +sleep 1 +ssh $broker7 $j -cp DREAM.jar dream.eval.StartServer broker7 9000 $reds_broker3 & +sleep 1 +ssh $broker8 $j -cp DREAM.jar dream.eval.StartServer broker8 9000 $reds_broker3 & +sleep 1 +ssh $broker9 $j -cp DREAM.jar dream.eval.StartServer broker9 9000 $reds_broker2 & +sleep 1 +ssh $broker10 $j -cp DREAM.jar dream.eval.StartServer broker10 9000 $reds_broker2 & +sleep 1 + +### + +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker4 $reds_lock_manager v1 v1 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker5 $reds_lock_manager v2 v2 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker6 $reds_lock_manager v3 v3 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v4 v4 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v5 v5 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker4 $reds_lock_manager v6 v6 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker5 $reds_lock_manager v7 v7 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker6 $reds_lock_manager v8 v8 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v9 v9 80000 2000 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v10 v10 80000 2000 & +sleep 1 + +### + +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager a11 a11 v1@v1:v6@v6 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager a12 a12 v1@v1 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager a21 a21 a11@a11:a12@a12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager a22 a22 a12@a12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager a31 a31 a21@a21:a22@a22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager a32 a32 a22@a22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager a41 a41 a31@a31:a32@a32 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager a42 a42 a32@a32 & +sleep 1 + +### + +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager b11 b11 v2@v2:v7@v7 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker5 $reds_lock_manager b12 b12 v2@v2 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager b21 b21 a12@a12:b11@b11:b12@b12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager b22 b22 b12@b12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager b31 b31 b21@b21:b22@b22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker5 $reds_lock_manager b32 b32 b22@b22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager b41 b41 b31@b31:b32@b32 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager b42 b42 b32@b32 & + +### + +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker5 $reds_lock_manager c11 c11 v3@v3:v8@v8 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager c12 c12 v3@v3 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager c21 c21 b12@b12:c11@c11:c12@c12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager c22 c22 c12@c12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager c31 c31 c21@c21:c22@c22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager c32 c32 c22@c22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager c41 c41 c31@c31:c32@c32 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager c42 c42 c32@c32 & + +### + +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager d11 d11 v4@v4:v9@v9 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager d12 d12 v4@v4 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager d21 d21 c12@c12:d11@d11:d12@d12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager d22 d22 d12@d12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager d31 d31 d21@d21:d22@d22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager d32 d32 d22@d22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager d41 d41 d31@d31:d32@d32 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager d42 d42 d32@d32 & + +### + +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager e11 e11 v5@v5:v10@v10 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager e12 e12 v5@v5 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager e21 e21 d12@d12:e11@e11:e12@e12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager e22 e22 e12@e12 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager e31 e31 e21@e21:e22@e22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager e32 e32 e22@e22 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager e41 e41 e31@e31:e32@e32 & +sleep 1 +ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager e42 e42 e32@e32 & diff --git a/Dream2/eval/process_scripts/process_results.py b/Dream2/eval/process_scripts/process_results.py new file mode 100644 index 0000000..61457b6 --- /dev/null +++ b/Dream2/eval/process_scripts/process_results.py @@ -0,0 +1,98 @@ +from math import sqrt +import os +import scipy.stats + +results_dir = "./results/" +running_time = 300 * 2 + +def extractList(predicate): + global results_dir + lst = [] + for file in filter(predicate, os.listdir(results_dir)): + if file.endswith(".txt"): + f = open(results_dir + "/" + file, "r") + lines = f.readlines() + for i in range(0, len(lines)): + lst.append(float(lines[i])) + f.close() + + return lst + + +def sumTraffic(predicate, column): + global results_dir + trafficMap = {} + for file in filter(predicate, os.listdir(results_dir)): + if file.endswith(".txt"): + f = open(results_dir + "/" + file, "r") + lines = f.readlines() + for i in range(0, len(lines)): + line = lines[i] + pkt = line.split()[0] + val = float(line.split()[column]) + if pkt in trafficMap: + val = val + trafficMap[pkt] + trafficMap[pkt] = val + f.close() + + return trafficMap + + +def sumTrafficPkts(predicate): + return sumTraffic(predicate, 1) + + +def sumTrafficKBS(predicate): + return sumTraffic(predicate, 2) + + +def confidenceDelay(lst): + sum = 0.0 + sumSquare = 0.0 + count = 0 + + for val in lst: + sum = sum + val + sumSquare = sumSquare+(val**2) + count = count+1 + + mean = sum/count + stdDev = sqrt(abs(sumSquare/count-(mean**2))*count/(count-1)) + confidence = (-scipy.stats.t.ppf(0.05,count-1).sum()*stdDev)/sqrt(count) + + return confidence + + +def printDelay(protocol): + print(protocol) + for level in range(1,5): + lst = extractList(lambda x: x[1] == str(level) and "delay" in x and protocol in x) + avg = sum(lst)/len(lst) + confidence = confidenceDelay(lst) + print(str(level) + " " + str(avg) + " " + str(confidence)) + lst = extractList(lambda x: "delay" in x and protocol in x) + avg = sum(lst)/len(lst) + confidence = confidenceDelay(lst) + print("Total " + str(avg) + " " + str(confidence)) + + +def printTraffic(protocol): + global running_time + print(protocol) + pktsMap = sumTrafficPkts(lambda x: "traffic" in x and protocol in x) + kbsMap = sumTrafficKBS(lambda x: "traffic" in x and protocol in x) + for (k, v) in pktsMap.items(): + print(k + "\t" + str(v) + " pkts\t" + str(kbsMap[k]/(running_time*1000)) + " KB/s") + + +print("*** Delay ***") +printDelay("Causal") +printDelay("Single_source_glitch_free") +printDelay("Complete_glitch_free") +printDelay("Atomic") + +print("*** Traffic ***") +printTraffic("Causal") +printTraffic("Single_source_glitch_free") +printTraffic("Complete_glitch_free") +printTraffic("Atomic") diff --git a/Dream2/eval/results/results.tgz b/Dream2/eval/results/results.tgz new file mode 100644 index 0000000..3333228 Binary files /dev/null and b/Dream2/eval/results/results.tgz differ diff --git a/Dream2/eval/results/results_summary.txt b/Dream2/eval/results/results_summary.txt new file mode 100644 index 0000000..27fbdaa --- /dev/null +++ b/Dream2/eval/results/results_summary.txt @@ -0,0 +1,48 @@ +*** Delay *** +Causal +1 13.6975947064 0.345167393789 +2 36.2754037053 0.4300273317 +3 57.9256247251 0.497485644096 +4 89.0463249322 0.477204840466 +Total 56.7127674806 0.349006684908 +Single_source_glitch_free +1 10.7324043987 0.109840628571 +2 45.1498443837 0.290770992685 +3 65.2101957994 0.444436697569 +4 89.8191998031 0.473445153253 +Total 55.0853281736 0.365671139301 +Complete_glitch_free +1 27.249077988 0.765836304213 +2 56.6824640183 0.544100893748 +3 76.188045226 0.655165600347 +4 92.2011379937 0.767327098272 +Total 65.0915597017 0.430133655038 +Atomic +1 40.6880773415 0.683120096158 +2 71.9482150787 0.547210917915 +3 86.8404346433 0.589093498436 +4 102.820055687 0.61606537902 +Total 77.5325273668 0.390049951696 +*** Traffic *** +Causal +__DREAM_PUBLICATION_PACKET_SUBJECT 123300.0 pkts 113.0 KB/s +__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s +__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s +Single_source_glitch_free +__DREAM_PUBLICATION_PACKET_SUBJECT 102000.0 pkts 93.475 KB/s +__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s +__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s +Complete_glitch_free +__DREAM_LOCK_REQUEST_PACKET_SUBJECT 3000.0 pkts 3.49 KB/s +__DREAM_LOCK_GRANT_PACKET_SUBJECT 3000.0 pkts 0.845 KB/s +__DREAM_PUBLICATION_PACKET_SUBJECT 102000.0 pkts 103.35 KB/s +__DREAM_LOCK_RELEASE_PACKET_SUBJECT 15600.0 pkts 4.446 KB/s +__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s +__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s +Atomic +__DREAM_LOCK_REQUEST_PACKET_SUBJECT 8365.0 pkts 9.21751666667 KB/s +__DREAM_LOCK_GRANT_PACKET_SUBJECT 8365.0 pkts 2.35614166667 KB/s +__DREAM_PUBLICATION_PACKET_SUBJECT 102000.0 pkts 97.44 KB/s +__DREAM_LOCK_RELEASE_PACKET_SUBJECT 11064.0 pkts 3.15324 KB/s +__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s +__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s \ No newline at end of file diff --git a/Dream2/eval/topology/topology.graffle b/Dream2/eval/topology/topology.graffle new file mode 100644 index 0000000..1c4d727 Binary files /dev/null and b/Dream2/eval/topology/topology.graffle differ diff --git a/Dream2/eval/topology/topology.pdf b/Dream2/eval/topology/topology.pdf new file mode 100644 index 0000000..0ca475e Binary files /dev/null and b/Dream2/eval/topology/topology.pdf differ diff --git a/Dream2/src/eval/java/dream/eval/EvalSignalClient.java b/Dream2/src/eval/java/dream/eval/EvalSignalClient.java new file mode 100755 index 0000000..3f6ffc2 --- /dev/null +++ b/Dream2/src/eval/java/dream/eval/EvalSignalClient.java @@ -0,0 +1,104 @@ +package dream.eval; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.logging.Logger; + +import dream.client.DreamClient; +import dream.client.LockToken; +import dream.client.RemoteVar; +import dream.client.Signal; +import dream.client.UpdateProducer; +import dream.common.ConsistencyType; +import dream.common.Consts; + +public class EvalSignalClient { + + public static void main(String args[]) { + if (args.length < 5) { + System.out.println( + "Usage: EvalSignalClient [:]+"); + System.exit(0); + } + + final String serverAddr = args[0]; + final String lockMgrAddr = args[1]; + final String hostName = args[2]; + final String signalName = args[3]; + final String deps = args[4]; + + Consts.serverAddr = serverAddr; + Consts.lockManagerAddr = lockMgrAddr; + + Consts.setHostName(hostName); + + final DreamClient client = DreamClient.instance; + client.connect(); + + // TODO ... + try { + Thread.sleep(60000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + + StringTokenizer tokenizer = new StringTokenizer(deps, ":"); + final List relevantRemoteVars = new ArrayList<>(); + + while (tokenizer.hasMoreTokens()) { + relevantRemoteVars.add(tokenizer.nextToken()); + } + + // Wait until all remote vars have been notified + while (!client.listVariables().containsAll(relevantRemoteVars)) { + try { + Thread.sleep(500); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + } + + // Create RemoteVar objects + final List> remoteVars = new ArrayList<>(); + for (String remoteVar : relevantRemoteVars) { + remoteVars.add(new RemoteVar(remoteVar.split("@")[1], remoteVar.split("@")[0])); + } + + @SuppressWarnings("unchecked") + final Signal signal = new Signal(signalName, () -> { + int result = 0; + for (UpdateProducer remoteVar : remoteVars) { + result += ((RemoteVar) remoteVar).get(); + } + return result; + }, 1, remoteVars); + + final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); + signal.change().addHandler((oldVal, val) -> logger.fine("Signal: " + val)); + + if (Consts.consistencyType == ConsistencyType.ATOMIC) { + final Set readLock = new HashSet<>(); + readLock.add(signalName + "@" + hostName); + new Thread(() -> { + while (true) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + LockToken token = client.readLock(readLock); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + client.unlock(token); + } + }).start(); + } + + } +} diff --git a/Dream2/src/eval/java/dream/eval/EvalVarClient.java b/Dream2/src/eval/java/dream/eval/EvalVarClient.java new file mode 100755 index 0000000..ac1e05f --- /dev/null +++ b/Dream2/src/eval/java/dream/eval/EvalVarClient.java @@ -0,0 +1,64 @@ +package dream.eval; + +import java.util.Random; +import java.util.logging.Logger; + +import dream.client.DreamClient; +import dream.client.Var; +import dream.common.Consts; + +public class EvalVarClient { + private static final int numChanges = 300; + + public static void main(String args[]) { + if (args.length < 6) { + System.out.println( + "Usage: EvalVarClient "); + System.exit(1); + } + + final String serverAddr = args[0]; + final String lockMgrAddr = args[1]; + final String hostName = args[2]; + final String varName = args[3]; + final int startTime = Integer.parseInt(args[4]); + final int sleepTime = Integer.parseInt(args[5]); + + Consts.serverAddr = serverAddr; + Consts.lockManagerAddr = lockMgrAddr; + + Consts.setHostName(hostName); + + final DreamClient client = DreamClient.instance; + client.connect(); + + // TODO ... + try { + Thread.sleep(80000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + + final Var remoteInt = new Var(varName, 1); + final Random random = new Random(); + + try { + Thread.sleep(startTime); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + + for (int i = 0; i < numChanges; ++i) { + remoteInt.set(random.nextInt(1000)); + try { + Thread.sleep(sleepTime); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + } + + final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); + logger.info(hostName + " finished sending updates"); + } + +} diff --git a/Dream2/src/eval/java/dream/eval/StartLockManager.java b/Dream2/src/eval/java/dream/eval/StartLockManager.java new file mode 100644 index 0000000..9f6c1a2 --- /dev/null +++ b/Dream2/src/eval/java/dream/eval/StartLockManager.java @@ -0,0 +1,69 @@ +package dream.eval; + +import java.io.IOException; +import java.util.logging.Logger; + +import dream.common.Consts; +import dream.common.packets.discovery.LockManagerHelloPacket; +import dream.common.packets.locking.LockReleasePacket; +import dream.common.packets.locking.LockRequestPacket; +import dream.locking.LockManagerForwarder; +import polimi.reds.NodeDescriptor; +import polimi.reds.broker.overlay.GenericOverlay; +import polimi.reds.broker.overlay.NeighborhoodChangeListener; +import polimi.reds.broker.overlay.NotRunningException; +import polimi.reds.broker.overlay.Overlay; +import polimi.reds.broker.overlay.SimpleTopologyManager; +import polimi.reds.broker.overlay.TCPTransport; +import polimi.reds.broker.overlay.TopologyManager; +import polimi.reds.broker.overlay.Transport; +import polimi.reds.broker.routing.GenericRouter; + +public class StartLockManager implements NeighborhoodChangeListener { + private final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); + private final Overlay overlay; + + public static void main(String args[]) { + if (args.length < 2) { + System.out.println("Usage: StartLockManager "); + System.exit(0); + } + Consts.setHostName(args[0]); + Consts.lockManagerPort = Integer.parseInt(args[1]); + new StartLockManager(); + } + + private StartLockManager() { + logger.info("Starting lock manager"); + final Transport tr = new TCPTransport(Consts.lockManagerPort); + final TopologyManager tm = new SimpleTopologyManager(); + overlay = new GenericOverlay(tm, tr); + final GenericRouter router = new GenericRouter(overlay); + final LockManagerForwarder forwarder = new LockManagerForwarder(); + router.setPacketForwarder(LockRequestPacket.subject, forwarder); + router.setPacketForwarder(LockReleasePacket.subject, forwarder); + overlay.addNeighborhoodChangeListener(this); + overlay.start(); + logger.info("Lock manager started"); + } + + @Override + public void notifyNeighborAdded(NodeDescriptor sender) { + try { + overlay.send(LockManagerHelloPacket.subject, new LockManagerHelloPacket(), sender); + } catch (IOException | NotRunningException e) { + e.printStackTrace(); + } + } + + @Override + public void notifyNeighborDead(NodeDescriptor sender) { + // Nothing to do + } + + @Override + public void notifyNeighborRemoved(NodeDescriptor sender) { + // Nothing to do + } + +} diff --git a/Dream2/src/eval/java/dream/eval/StartServer.java b/Dream2/src/eval/java/dream/eval/StartServer.java new file mode 100755 index 0000000..39ce953 --- /dev/null +++ b/Dream2/src/eval/java/dream/eval/StartServer.java @@ -0,0 +1,84 @@ +package dream.eval; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.MalformedURLException; +import java.util.Arrays; +import java.util.List; +import java.util.logging.Logger; + +import dream.common.Consts; +import dream.common.packets.AdvertisementPacket; +import dream.common.packets.EventPacket; +import dream.common.packets.SubscriptionPacket; +import dream.common.packets.discovery.ServerHelloPacket; +import dream.server.ServerEventForwarder; +import polimi.reds.NodeDescriptor; +import polimi.reds.broker.overlay.GenericOverlay; +import polimi.reds.broker.overlay.NeighborhoodChangeListener; +import polimi.reds.broker.overlay.NotRunningException; +import polimi.reds.broker.overlay.Overlay; +import polimi.reds.broker.overlay.SimpleTopologyManager; +import polimi.reds.broker.overlay.TCPTransport; +import polimi.reds.broker.overlay.TopologyManager; +import polimi.reds.broker.overlay.Transport; +import polimi.reds.broker.routing.GenericRouter; + +public class StartServer implements NeighborhoodChangeListener { + private final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); + private Overlay overlay = null; + + public static void main(String[] args) { + if (args.length < 2) { + System.out.println("Usage: StartServer [reds-tcp:address:port]*"); + System.exit(0); + } + Consts.setHostName(args[0]); + int port = Integer.parseInt(args[1]); + List neighbors = Arrays.asList(args); + new StartServer(port, neighbors.subList(2, neighbors.size())); + } + + private StartServer(final int port, final List neighbors) { + logger.info("Starting server"); + final Transport tr = new TCPTransport(port); + final TopologyManager tm = new SimpleTopologyManager(); + overlay = new GenericOverlay(tm, tr); + final GenericRouter router = new GenericRouter(overlay); + final ServerEventForwarder forwarder = new ServerEventForwarder(overlay); + overlay.addNeighborhoodChangeListener(forwarder); + router.setPacketForwarder(EventPacket.subject, forwarder); + router.setPacketForwarder(SubscriptionPacket.subject, forwarder); + router.setPacketForwarder(AdvertisementPacket.subject, forwarder); + overlay.addNeighborhoodChangeListener(this); + overlay.start(); + logger.info("Server started"); + neighbors.forEach(n -> { + try { + overlay.addNeighbor(n); + } catch (ConnectException | MalformedURLException | NotRunningException e) { + e.printStackTrace(); + } + }); + } + + @Override + public void notifyNeighborAdded(NodeDescriptor sender) { + try { + overlay.send(ServerHelloPacket.subject, new ServerHelloPacket(), sender); + } catch (IOException | NotRunningException e) { + e.printStackTrace(); + } + } + + @Override + public void notifyNeighborDead(NodeDescriptor sender) { + // Nothing to do + } + + @Override + public void notifyNeighborRemoved(NodeDescriptor sender) { + // Nothing to do + } + +} diff --git a/Dream2/src/eval/java/dream/eval/utils/EvalUtils.java b/Dream2/src/eval/java/dream/eval/utils/EvalUtils.java new file mode 100644 index 0000000..45f4dbb --- /dev/null +++ b/Dream2/src/eval/java/dream/eval/utils/EvalUtils.java @@ -0,0 +1,58 @@ +package dream.eval.utils; + +import java.io.ByteArrayOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Map; + +import dream.common.Consts; + +public class EvalUtils { + + public static final void updateTraffic(Serializable pkt, String subject, final Map trafficPkts, + final Map trafficBytes) { + long currentPkts = trafficPkts.containsKey(subject) ? trafficPkts.get(subject) : 0; + trafficPkts.put(subject, currentPkts + 1); + long currentBytes = trafficBytes.containsKey(subject) ? trafficBytes.get(subject) : 0; + trafficBytes.put(subject, currentBytes + sizeof(pkt)); + } + + private static final int sizeof(Serializable pkt) { + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + try { + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteOutputStream); + objectOutputStream.writeObject(pkt); + objectOutputStream.flush(); + objectOutputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return byteOutputStream.toByteArray().length; + } + + public static final void saveTrafficToFile(final Map trafficPkts, + final Map trafficBytes) { + try { + FileWriter writer = new FileWriter(Consts.trafficMeasurementFile, false); + for (String subject : trafficPkts.keySet()) { + writer.write(subject + "\t" + trafficPkts.get(subject) + "\t" + trafficBytes.get(subject) + "\n"); + } + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void saveDelayToFile(int count, double sum) { + try { + FileWriter writer = new FileWriter(Consts.delayMeasurementFile, true); + writer.write(sum / count + "\n"); + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} diff --git a/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java b/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java index e490ae4..1c89ec3 100644 --- a/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java +++ b/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java @@ -11,7 +11,7 @@ public class ConsumerApp { public ConsumerApp() throws Exception { // App will be running on host different from the producer - Consts.hostName = "Host2"; + Consts.setHostName("Host2"); // Register a Subscription RemoteVar rv = new RemoteVar("Host1", "exVar"); @@ -20,7 +20,7 @@ public ConsumerApp() throws Exception { // trigger appropriate action Signal s = new Signal("s", () -> { return rv.get() + "ABC"; - } , rv); + }, "", rv); // Register a handler which will be executed upon receiving the signal s.change().addHandler((oldVal, val) -> System.out.println("Signal1: " + val)); diff --git a/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java b/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java index 16f743d..079868b 100644 --- a/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java +++ b/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java @@ -10,7 +10,7 @@ public class ProducerApp { public ProducerApp() throws Exception { // Mention the host(node) which is producing the value - Consts.hostName = "Host1"; + Consts.setHostName("Host1"); // myVar is created and registered as exVar for remote consumption. // exVar is initialized to AAA diff --git a/Dream2/src/examples/java/dream/examples/chat/core/Chat.java b/Dream2/src/examples/java/dream/examples/chat/core/Chat.java index c80227e..20468bf 100644 --- a/Dream2/src/examples/java/dream/examples/chat/core/Chat.java +++ b/Dream2/src/examples/java/dream/examples/chat/core/Chat.java @@ -43,7 +43,7 @@ public Chat(String username, int window_x, int window_y) { return new ArrayList(); else return registeredClients.get(); - }, registeredClients); + }, new ArrayList(), registeredClients); onlineList.change().addHandler((o, n) -> { if (n.contains("toServerVar@" + username) && gui == null) setup(); @@ -95,7 +95,7 @@ private void setup() { return remote.get(); else return ""; - }, remote); + }, "", remote); fromServer.change().addHandler((oldValue, newValue) -> receivedServerMessage(newValue)); logger.fine("Setup: Starting GUI"); @@ -170,7 +170,7 @@ private void createConnection(int roomNumber, String roomName, String clientName return r.get(); else return ""; - }, r); + }, "", r); s.change().addHandler((oldValue, newValue) -> receivedChatMessage(roomNumber, clientName, newValue)); } diff --git a/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java b/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java index 18a61f4..902b4cd 100644 --- a/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java +++ b/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java @@ -83,7 +83,7 @@ private void createNewSessionFor(String clientName, String clientVar) { return var.get(); else return ""; - }, var); + }, "", var); sig.change().addHandler((oldValue, newValue) -> receivedMessage(clientName, newValue)); // add client as registered diff --git a/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java b/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java index 77823f6..4e96902 100644 --- a/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java +++ b/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java @@ -36,9 +36,9 @@ public void start() { f2 = new RemoteVar<>("Model2", "f2"); f3 = new RemoteVar<>("Model3", "f3"); - f1Signal = new Signal<>("f1Signal", () -> f1.get(), f1); - f2Signal = new Signal<>("f2Signal", () -> f2.get(), f2); - f3Signal = new Signal<>("f3Signal", () -> f3.get(), f3); + f1Signal = new Signal<>("f1Signal", () -> f1.get(), 1, f1); + f2Signal = new Signal<>("f2Signal", () -> f2.get(), 1, f2); + f3Signal = new Signal<>("f3Signal", () -> f3.get(), 1, f3); f1Signal.change().addHandler(this); f2Signal.change().addHandler(this); diff --git a/Dream2/src/examples/java/dream/examples/financial/Model1.java b/Dream2/src/examples/java/dream/examples/financial/Model1.java index 3d7e9c9..b4045d2 100644 --- a/Dream2/src/examples/java/dream/examples/financial/Model1.java +++ b/Dream2/src/examples/java/dream/examples/financial/Model1.java @@ -27,7 +27,7 @@ public void start() { } else { return marketIndex.get() * 2 + stockOpts.get(); } - } , marketIndex, stockOpts); + }, 1, marketIndex, stockOpts); f1.change().addHandler((oldVal, newVal) -> System.out.println("New value for f1: " + newVal)); } diff --git a/Dream2/src/examples/java/dream/examples/financial/Model2.java b/Dream2/src/examples/java/dream/examples/financial/Model2.java index a63825b..d23fb75 100644 --- a/Dream2/src/examples/java/dream/examples/financial/Model2.java +++ b/Dream2/src/examples/java/dream/examples/financial/Model2.java @@ -27,7 +27,7 @@ public void start() { } else { return marketIndex.get() + stockOpts.get() * 2; } - } , marketIndex, stockOpts); + }, 1, marketIndex, stockOpts); f2.change().addHandler((oldVal, newVal) -> System.out.println("New value for f2: " + newVal)); } diff --git a/Dream2/src/examples/java/dream/examples/financial/Model3.java b/Dream2/src/examples/java/dream/examples/financial/Model3.java index f85bdce..8683335 100644 --- a/Dream2/src/examples/java/dream/examples/financial/Model3.java +++ b/Dream2/src/examples/java/dream/examples/financial/Model3.java @@ -28,7 +28,7 @@ public void start() { } else { return marketIndex.get() + news.get(); } - } , marketIndex, news); + }, 1, marketIndex, news); f3.change().addHandler((oldVal, newVal) -> System.out.println("New value for f3: " + newVal)); } diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java index df02d5b..095c2c9 100644 --- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java +++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java @@ -13,35 +13,35 @@ protected void createDependencies() { return working_hours.get() > required_hours.get().getFirst(); else return false; - }, working_hours, required_hours); + }, true, working_hours, required_hours); final Signal maximumHours = new Signal<>(MaximumHours, () -> { if (working_hours.get() != null && required_hours.get() != null) return working_hours.get() < required_hours.get().getSecond(); else return false; - }, working_hours, required_hours); + }, true, working_hours, required_hours); final Signal minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> { if (euro_per_hour.get() != null) return euro_per_hour.get() > 10; else return false; - }, euro_per_hour); + }, true, euro_per_hour); new Signal<>(SettingsOkay, () -> { if (minimumHours.get() != null && maximumHours.get() != null && minimumEuroPerHour.get() != null) return minimumHours.get() && maximumHours.get() && minimumEuroPerHour.get(); else return false; - }, minimumHours, maximumHours, minimumEuroPerHour); + }, true, minimumHours, maximumHours, minimumEuroPerHour); new Signal<>(Salary, () -> { if (working_hours.get() != null && euro_per_hour.get() != null) return working_hours.get() * euro_per_hour.get(); else return 0.0; - }, working_hours, euro_per_hour); + }, true, working_hours, euro_per_hour); logger.fine("Finished building Dependencies"); } diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java index efe026a..b8914e8 100644 --- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java +++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java @@ -43,14 +43,14 @@ protected void start() { return salary.get(); else return 0.0; - }, salary); + }, 0.0, salary); remoteSettings = new Signal<>("remoteSettings", () -> { if (settings.get() != null) return settings.get(); else return false; - }, settings); + }, false, settings); gui.setText("Salary: "); gui.setColor(Color.red); diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java index 3789a9a..075bf85 100644 --- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java +++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java @@ -61,35 +61,35 @@ protected void createDependencies() { return working_hours.get() > required_hours.get().getFirst(); else return false; - }, working_hours, required_hours); + }, false, working_hours, required_hours); final Signal maximumHours = new Signal<>(MaximumHours, () -> { if (working_hours.get() != null && required_hours.get() != null) return working_hours.get() < required_hours.get().getSecond(); else return false; - }, working_hours, required_hours); + }, false, working_hours, required_hours); final Signal minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> { if (euro_per_hour.get() != null) return euro_per_hour.get() > 10; else return false; - }, euro_per_hour); + }, false, euro_per_hour); new Signal<>(SettingsOkay, () -> { if (minimumHours.get() != null && maximumHours.get() != null && minimumEuroPerHour.get() != null) return minimumHours.get() && maximumHours.get() && minimumEuroPerHour.get(); else return false; - }, minimumHours, maximumHours, minimumEuroPerHour); + }, false, minimumHours, maximumHours, minimumEuroPerHour); new Signal<>(Salary, () -> { if (working_hours.get() != null && euro_per_hour.get() != null) return working_hours.get() * euro_per_hour.get(); else return 0.0; - }, working_hours, euro_per_hour); + }, 0.0, working_hours, euro_per_hour); logger.fine("Finished building Dependencies"); } diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java index 7b88796..dc68b9a 100644 --- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java +++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java @@ -26,7 +26,7 @@ public LockClient(String name) { return new ArrayList>(); else return registeredClients.get(); - }, registeredClients); + }, new ArrayList>(), registeredClients); s.change().addHandler((o, n) -> { if (n.contains(new Pair<>(this.getHostName(), LockManager.VAR_requestLock)) && setup == false) lockSetup(); @@ -42,7 +42,7 @@ private void lockSetup() { RemoteVar lock = new RemoteVar<>(LockManager.NAME, LockManager.VAR_lock); Signal sLock = new Signal<>("lock", () -> { return lock.get(); - }, lock); + }, null, lock); sLock.change().addHandler((oldValue, newValue) -> { newValue.forEach((var, client) -> { if (client.equals(getHostName())) diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java index d925fd0..73fc517 100644 --- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java +++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java @@ -60,7 +60,7 @@ private void createNewSessionFor(Variable x) { return rv.get(); else return null; - }, rv); + }, null, rv); s.change().addHandler((oldValue, newValue) -> { if (newValue.isLockRequest()) { // client requesting a lock diff --git a/Dream2/src/examples/java/dream/examples/form/core/FormClient.java b/Dream2/src/examples/java/dream/examples/form/core/FormClient.java index dd77f4d..3f2a831 100644 --- a/Dream2/src/examples/java/dream/examples/form/core/FormClient.java +++ b/Dream2/src/examples/java/dream/examples/form/core/FormClient.java @@ -44,14 +44,14 @@ protected void start() { return salary.get(); else return 0.0; - }, salary); + }, 0.0, salary); remoteSettings = new Signal<>("remoteSettings", () -> { if (settings.get() != null) return settings.get(); else return false; - }, settings); + }, false, settings); gui.setText("Salary: "); gui.setColor(Color.red); diff --git a/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java b/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java index 3a56c02..851e5b3 100644 --- a/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java +++ b/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java @@ -48,35 +48,35 @@ protected void createDependencies() { return working_hours.get() > 10; else return false; - }, working_hours); + }, false, working_hours); final Signal maximumHours = new Signal<>(MaximumHours, () -> { if (working_hours.get() != null) return working_hours.get() < 60; else return false; - }, working_hours); + }, false, working_hours); final Signal minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> { if (euro_per_hour.get() != null) return euro_per_hour.get() > 10; else return false; - }, euro_per_hour); + }, false, euro_per_hour); new Signal<>(SettingsOkay, () -> { if (minimumHours.get() != null && maximumHours.get() != null && minimumEuroPerHour.get() != null) return minimumHours.get() && maximumHours.get() && minimumEuroPerHour.get(); else return false; - }, minimumHours, maximumHours, minimumEuroPerHour); + }, false, minimumHours, maximumHours, minimumEuroPerHour); new Signal<>(Salary, () -> { if (working_hours.get() != null && euro_per_hour.get() != null) return working_hours.get() * euro_per_hour.get(); else return 0.0; - }, working_hours, euro_per_hour); + }, 0.0, working_hours, euro_per_hour); logger.fine("Finished building Dependencies"); } diff --git a/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java b/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java index d98caab..3edcb2c 100644 --- a/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java +++ b/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java @@ -23,15 +23,15 @@ protected void createDependencies() { minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> { return euro_per_hour.get() > 10; - }, euro_per_hour); + }, false, euro_per_hour); final Signal minimumHours = new Signal<>(MinimumHours, () -> { return working_hours.get() > 10; - }, working_hours); + }, false, working_hours); final Signal maximumHours = new Signal<>(MaximumHours, () -> { return working_hours.get() < 60; - }, working_hours); + }, false, working_hours); minimumEuroPerHour.change().addHandler((o, n) -> updateSettingsOkay()); @@ -50,7 +50,7 @@ protected void createDependencies() { return working_hours.get() * euro_per_hour.get(); else return 0.0; - }, working_hours, euro_per_hour); + }, 0.0, working_hours, euro_per_hour); logger.fine("Finished building Dependencies"); } diff --git a/Dream2/src/examples/java/dream/examples/local/Example1.java b/Dream2/src/examples/java/dream/examples/local/Example1.java index 03f0632..6cd4c7a 100755 --- a/Dream2/src/examples/java/dream/examples/local/Example1.java +++ b/Dream2/src/examples/java/dream/examples/local/Example1.java @@ -13,12 +13,12 @@ public static void main(String args[]) { final Var varString2 = new Var<>("varString2", ""); final Signal signalInt = new Signal("signalInt", - () -> 10 - 2 + (varInt.get() * 2 + varInt.get()) / 2, varInt); + () -> 10 - 2 + (varInt.get() * 2 + varInt.get()) / 2, 1, varInt); final Signal signalDouble = new Signal("signalDouble", - () -> varDouble.get() + varDouble.get() * 2, varDouble); - final Signal signalBool = new Signal("signalBool", () -> !varBool.get(), varBool); + () -> varDouble.get() + varDouble.get() * 2, 1.0, varDouble); + final Signal signalBool = new Signal("signalBool", () -> !varBool.get(), false, varBool); final Signal signalString = new Signal("signalString", - () -> varString1.get() + varString2.get(), varString1, varString2); + () -> varString1.get() + varString2.get(), "", varString1, varString2); signalInt.change() .addHandler((oldVal, val) -> System.out.println("signalInt: " + val + " (correct value: 158)")); diff --git a/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java b/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java index 22292e2..9e0bf74 100644 --- a/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java +++ b/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java @@ -7,9 +7,9 @@ public class ExampleFilter { public static void main(String args[]) { final Var varInt = new Var<>("varInt", 1); - final Signal signalInt = new Signal<>("signalInt", () -> varInt.get() + 1, + final Signal signalInt = new Signal<>("signalInt", () -> varInt.get() + 1, 1, varInt.filter(val -> val > 10)); - final Signal signalInt2 = new Signal<>("signalInt2", () -> signalInt.get() + 1, + final Signal signalInt2 = new Signal<>("signalInt2", () -> signalInt.get() + 1, 1, signalInt.filter(val -> val > 20)); signalInt.change().addHandler((oldVal, val) -> System.out.println("SignalInt: " + val)); diff --git a/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java b/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java index 70fe13b..3aa0d27 100755 --- a/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java +++ b/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java @@ -13,10 +13,10 @@ public static void main(String args[]) { public void launch() { final Var var = new Var<>("var", 1.0); - final Signal mid1 = new Signal<>("mid1", () -> var.get() * 2, var); - final Signal mid2 = new Signal<>("mid2", () -> var.get() * 3, var); + final Signal mid1 = new Signal<>("mid1", () -> var.get() * 2, 1.0, var); + final Signal mid2 = new Signal<>("mid2", () -> var.get() * 3, 1.0, var); - final Signal finalResult = new Signal<>("final", () -> mid1.get() + mid2.get(), mid1, mid2); + final Signal finalResult = new Signal<>("final", () -> mid1.get() + mid2.get(), 1.0, mid1, mid2); finalResult.change().addHandler((oldVal, val) -> System.out.println(val)); try { diff --git a/Dream2/src/examples/java/dream/examples/local/ExampleList.java b/Dream2/src/examples/java/dream/examples/local/ExampleList.java index 4e8aa67..def5917 100755 --- a/Dream2/src/examples/java/dream/examples/local/ExampleList.java +++ b/Dream2/src/examples/java/dream/examples/local/ExampleList.java @@ -9,7 +9,8 @@ public class ExampleList { public static void main(String args[]) { final Var> varList = new Var<>("varList", new ArrayList()); - final Signal signalInt = new Signal("signalInt", () -> 1000 + varList.get().size(), varList); + final Signal signalInt = new Signal("signalInt", () -> 1000 + varList.get().size(), 1, + varList); signalInt.change().addHandler((oldVal, val) -> System.out.println(val)); diff --git a/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java b/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java index 9d3eeee..a8e6bdd 100755 --- a/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java +++ b/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java @@ -12,7 +12,7 @@ public class RemoteSignalExample { public static void main(String args[]) { - Consts.hostName = "Signal"; + Consts.setHostName("Signal"); final DreamClient client = DreamClient.instance; client.connect(); @@ -36,12 +36,12 @@ public static void main(String args[]) { final RemoteVar> remoteList = new RemoteVar<>("Remote", "remoteList"); final Signal signal1 = new Signal("signal1", - () -> remoteInt.get() + remoteString1.get().length(), remoteInt, remoteString1); - final Signal signal2 = new Signal("signal2", () -> remoteInt.get(), remoteInt); + () -> remoteInt.get() + remoteString1.get().length(), 1, remoteInt, remoteString1); + final Signal signal2 = new Signal("signal2", () -> remoteInt.get(), 1, remoteInt); final Signal signal3 = new Signal("signal3", () -> remoteString1.get() + remoteString2.get(), - remoteString1, remoteString2); + "", remoteString1, remoteString2); final Signal signal4 = new Signal("signal4", - () -> remoteString1.get().length() + remoteList.get().size(), remoteString1, remoteList); + () -> remoteString1.get().length() + remoteList.get().size(), 1, remoteString1, remoteList); signal1.change().addHandler((oldVal, val) -> System.out.println("Signal1: " + val)); signal2.change().addHandler((oldVal, val) -> System.out.println("Signal2: " + val)); diff --git a/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java b/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java index 2facf23..80b0ca7 100755 --- a/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java +++ b/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java @@ -9,7 +9,7 @@ public class RemoteVarExample { public static void main(String args[]) { - Consts.hostName = "Remote"; + Consts.setHostName("Remote"); final Var remoteInt = new Var("remoteInt", 1); final Var remoteString1 = new Var("remoteString1", "a"); final Var remoteString2 = new Var("remoteString2", "b"); diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java index 7984933..e8333e4 100644 --- a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java +++ b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java @@ -24,7 +24,7 @@ public LockClient(String name) { return new ArrayList>(); else return registeredClients.get(); - }, registeredClients); + }, new ArrayList>(), registeredClients); s.change().addHandler((o, n) -> { if (n.contains(new Pair<>(this.getHostName(), LockManager.VAR_requestLock)) && setup == false) lockSetup(); @@ -39,7 +39,7 @@ private void lockSetup() { RemoteVar lock = new RemoteVar<>(LockManager.NAME, LockManager.VAR_lock); Signal sLock = new Signal<>("lock", () -> { return lock.get(); - }, lock); + }, "", lock); sLock.change().addHandler((oldValue, newValue) -> { if (newValue.equals(getHostName())) hasLock = true; diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java index 835ec14..b807c25 100644 --- a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java +++ b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java @@ -57,7 +57,7 @@ private void createNewSessionFor(Pair x) { return rv.get(); else return false; - }, rv); + }, false, rv); s.change().addHandler((oldValue, newValue) -> { if (newValue) { // client requesting a lock diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java index eb8f12c..92b621b 100644 --- a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java +++ b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java @@ -36,12 +36,12 @@ public Monitor() { devs = new RemoteVar(Server.NAME, Server.VAR_developers); sigDevs = new Signal("sigDevs", () -> { return devs.get(); - }, devs); + }, "", devs); tasks = new RemoteVar(Server.NAME, Server.VAR_tasks); sigTasks = new Signal("sigTests", () -> { return tasks.get(); - }, tasks); + }, "", tasks); sigDevs.change().addHandler((oldVa, newVal) -> { System.out.println("newVal devs:" + newVal); diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java index 7db4274..25c190c 100644 --- a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java +++ b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java @@ -14,8 +14,8 @@ /** * Holds a list of tasks and a list of developers each indicated by a simple - * integer. Searches for new clients (TaskCreater) and registers to their - * "task creation channels" + * integer. Searches for new clients (TaskCreater) and registers to their "task + * creation channels" * * @author Min Yang * @author Tobias Becker @@ -78,7 +78,7 @@ private void initDependencies() { temp.addAll(rv2.get()); Collections.sort(temp); return temp.stream().map(a -> a.getDevString()).collect(Collectors.joining(":")); - }, rv1, rv2); + }, "", rv1, rv2); Signal tasks = new Signal(VAR_tasks, () -> { LinkedList temp = new LinkedList<>(); @@ -88,7 +88,7 @@ private void initDependencies() { temp.addAll(rv2.get()); Collections.sort(temp); return temp.stream().map(a -> a.getTaskString()).collect(Collectors.joining(":")); - }, rv1, rv2); + }, "", rv1, rv2); } diff --git a/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java b/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java index ddab317..db3fc94 100644 --- a/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java +++ b/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java @@ -49,13 +49,13 @@ public DeligatProcess() { } private void init() { - Consts.hostName = "Host2"; + Consts.setHostName("Host2"); RemoteVar rv = new RemoteVar("Host1", "TASK"); Var myVar = new Var("TASK_ASSIGNED", null); Signal s = new Signal("s", () -> { return rv.get(); - } , rv); + }, new Message(), rv); // Register a handler which will be executed upon receiving the signal s.change().addHandler((oldVal, val) -> { diff --git a/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java b/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java index d6b3c98..c9e8232 100644 --- a/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java +++ b/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java @@ -14,7 +14,7 @@ public class MasterProcess { private void init() { - Consts.hostName = "Host1"; + Consts.setHostName("Host1"); VectorClock vectorClock = new VectorClock("p1"); Var initTask = new Var("TASK", null); diff --git a/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java b/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java index 3d6d2de..003cdb0 100644 --- a/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java +++ b/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java @@ -63,7 +63,7 @@ public void isEvent(String val) { public void run() { localClock = new VectorClockHelper("p3", this); - Consts.hostName = "Host3"; + Consts.setHostName("Host3"); Thread t = new Thread(localClock); t.start(); RemoteVar task = new RemoteVar("Host1", "TASK2"); @@ -71,10 +71,10 @@ public void run() { Signal signalFromMaster = new Signal("s", () -> { return task.get(); - } , task); + }, new Message(), task); Signal signalFromDeligator = new Signal("s1", () -> { return taskDeligated.get(); - } , taskDeligated); + }, new Message(), taskDeligated); // Register a handler which will be executed upon receiving the signal // from master process diff --git a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java index a2c7610..1e951ce 100644 --- a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java +++ b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java @@ -49,13 +49,13 @@ public DeligatProcess() { } private void init() { - Consts.hostName = "Host2"; + Consts.setHostName("Host2"); RemoteVar rv = new RemoteVar("Host1", "TASK"); Var myVar = new Var("TASK_ASSIGNED", null); Signal s = new Signal("s", () -> { return rv.get(); - } , rv); + }, new Message(), rv); // Register a handler which will be executed upon receiving the signal s.change().addHandler((oldVal, val) -> { diff --git a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java index 3988855..2c35ef6 100644 --- a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java +++ b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java @@ -14,7 +14,7 @@ public class MasterProcess { private void init() { - Consts.hostName = "Host1"; + Consts.setHostName("Host1"); VectorClock vectorClock = new VectorClock("p1"); Var initTask = new Var("TASK", null); diff --git a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java index b78a7a2..7b67332 100644 --- a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java +++ b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java @@ -55,18 +55,17 @@ public void isEvent(String val) { } public void run() { - - Consts.hostName = "Host3"; + Consts.setHostName("Host3"); RemoteVar task = new RemoteVar("Host1", "TASK2"); RemoteVar taskDeligated = new RemoteVar("Host2", "TASK_ASSIGNED"); Signal signalFromMaster = new Signal("s", () -> { return task.get(); - } , task); + }, new Message(), task); Signal signalFromDeligator = new Signal("s1", () -> { return taskDeligated.get(); - } , taskDeligated); + }, new Message(), taskDeligated); // Register a handler which will be executed upon receiving the signal // from master process diff --git a/Dream2/src/examples/java/dream/examples/util/Client.java b/Dream2/src/examples/java/dream/examples/util/Client.java index 4dfcb4d..53b42b4 100644 --- a/Dream2/src/examples/java/dream/examples/util/Client.java +++ b/Dream2/src/examples/java/dream/examples/util/Client.java @@ -38,7 +38,7 @@ public Client(String name) { startDream(); // set hostName - Consts.hostName = name; + Consts.setHostName(name); // connect to the dependency graph DreamClient.instance.connect(); @@ -131,7 +131,7 @@ private static final void startLockManager() throws InterruptedException { } public String getHostName() { - return Consts.hostName; + return Consts.getHostName(); } private LinkedList lockQueue = new LinkedList<>(); diff --git a/Dream2/src/main/java/dream/client/ClientEventForwarder.java b/Dream2/src/main/java/dream/client/ClientEventForwarder.java index 8ab0cfd..3bc7eea 100755 --- a/Dream2/src/main/java/dream/client/ClientEventForwarder.java +++ b/Dream2/src/main/java/dream/client/ClientEventForwarder.java @@ -28,6 +28,7 @@ import dream.common.utils.FinalNodesDetector; import dream.common.utils.InterSourceDependencyDetector; import dream.common.utils.IntraSourceDependencyDetector; +import dream.eval.utils.EvalUtils; import polimi.reds.NodeDescriptor; import polimi.reds.broker.routing.Outbox; import polimi.reds.broker.routing.PacketForwarder; @@ -49,6 +50,9 @@ class ClientEventForwarder implements PacketForwarder { : new CompleteGlitchFreeDependencyDetector(); private final FinalNodesDetector finalNodesDetector = new FinalNodesDetector(); + private final Map trafficPkts = new HashMap<>(); + private final Map trafficBytes = new HashMap<>(); + // Lock applicants waiting for a grant private final Map lockApplicants = new HashMap<>(); @@ -85,6 +89,10 @@ private ClientEventForwarder() { public synchronized Collection forwardPacket(String subject, NodeDescriptor sender, Serializable packet, Collection neighbors, Outbox outbox) { final Collection result = new ArrayList(); + if (Consts.enableEvaluation) { + EvalUtils.updateTraffic(packet, subject, trafficPkts, trafficBytes); + EvalUtils.saveTrafficToFile(trafficPkts, trafficBytes); + } if (subject.equals(AdvertisementPacket.subject)) { assert packet instanceof AdvertisementPacket; logger.finer("Received an advertisement packet " + packet); @@ -107,7 +115,7 @@ public synchronized Collection forwardPacket(String subject, Nod return result; } - final synchronized void sendEvent(UUID id, Event ev, String initialVar) { + final synchronized void sendEvent(UUID id, Event ev, String initialVar, long timestamp) { logger.finer("Sending an event " + ev); Set lockReleaseNodes; switch (Consts.consistencyType) { @@ -122,7 +130,7 @@ final synchronized void sendEvent(UUID id, Event ev, String initialVar) { } if (subTable.needsToDeliverToServer(ev)) { - connectionManager.sendEvent(id, ev, initialVar, lockReleaseNodes); + connectionManager.sendEvent(id, ev, initialVar, lockReleaseNodes, timestamp); } } @@ -253,12 +261,16 @@ final synchronized void removeSubscription(Subscriber subscriber, Subscription sub) { return !isLocal(sub); } private final boolean isLocal(Subscription sub) { - return sub.getHostId().equals(Consts.hostName); + return sub.getHostId().equals(Consts.getHostName()); } private final void processEventFromServer(EventPacket evPkt) { @@ -272,26 +284,26 @@ private final void processEventFromServer(EventPacket evPkt) { } private final void processAdvertisementFromServer(AdvertisementPacket advPkt) { + final Set> subs = advPkt.getSubscriptions(); + switch (advPkt.getAdvType()) { + case ADV: + if (subs.isEmpty()) { + dependencyGraph.processAdv(advPkt.getAdvertisement()); + } else { + dependencyGraph.processAdv(advPkt.getAdvertisement(), subs); + } + break; + case UNADV: + if (subs.isEmpty()) { + dependencyGraph.processUnAdv(advPkt.getAdvertisement()); + } else { + dependencyGraph.processUnAdv(advPkt.getAdvertisement(), subs); + } + break; + } if (Consts.consistencyType == ConsistencyType.SINGLE_SOURCE_GLITCH_FREE || // Consts.consistencyType == ConsistencyType.COMPLETE_GLITCH_FREE || // Consts.consistencyType == ConsistencyType.ATOMIC) { - final Set> subs = advPkt.getSubscriptions(); - switch (advPkt.getAdvType()) { - case ADV: - if (subs.isEmpty()) { - dependencyGraph.processAdv(advPkt.getAdvertisement()); - } else { - dependencyGraph.processAdv(advPkt.getAdvertisement(), subs); - } - break; - case UNADV: - if (subs.isEmpty()) { - dependencyGraph.processUnAdv(advPkt.getAdvertisement()); - } else { - dependencyGraph.processUnAdv(advPkt.getAdvertisement(), subs); - } - break; - } updateDetectors(); } } diff --git a/Dream2/src/main/java/dream/client/ConnectionManager.java b/Dream2/src/main/java/dream/client/ConnectionManager.java index a237bb4..9206d1b 100755 --- a/Dream2/src/main/java/dream/client/ConnectionManager.java +++ b/Dream2/src/main/java/dream/client/ConnectionManager.java @@ -78,9 +78,9 @@ final NodeDescriptor getNodeDescriptor() { return overlay.getNodeDescriptor(); } - final void sendEvent(UUID id, Event event, String initialVar, - Set lockReleaseNodes) { - final EventPacket pkt = new EventPacket(event, id, initialVar); + final void sendEvent(UUID id, Event event, String initialVar, Set lockReleaseNodes, + long timestamp) { + final EventPacket pkt = new EventPacket(event, id, initialVar, timestamp); pkt.setLockReleaseNodes(lockReleaseNodes); sendToServer(EventPacket.subject, pkt); } @@ -177,6 +177,13 @@ public final Collection forwardPacket(String subject, NodeDescri return new ArrayList<>(); } + final boolean isConnected() { + return server != null && // + (lockManager != null || // + (Consts.consistencyType != ConsistencyType.COMPLETE_GLITCH_FREE && // + Consts.consistencyType != ConsistencyType.ATOMIC)); + } + private class PacketSubjectPair { private final String subject; private final Serializable packet; diff --git a/Dream2/src/main/java/dream/client/DreamClient.java b/Dream2/src/main/java/dream/client/DreamClient.java index fe8536f..673a1d3 100644 --- a/Dream2/src/main/java/dream/client/DreamClient.java +++ b/Dream2/src/main/java/dream/client/DreamClient.java @@ -54,6 +54,10 @@ public final void unlock(LockToken token) { } } + public final boolean isConnected() { + return eventForwarder.isConnected(); + } + private class Lock { private boolean unlocked = false; diff --git a/Dream2/src/main/java/dream/client/RemoteVar.java b/Dream2/src/main/java/dream/client/RemoteVar.java index 6e87851..e1f89ea 100755 --- a/Dream2/src/main/java/dream/client/RemoteVar.java +++ b/Dream2/src/main/java/dream/client/RemoteVar.java @@ -42,7 +42,7 @@ public RemoteVar(String host, String object, List> cons } public RemoteVar(String object, List> constraints) { - this(Consts.hostName, object, constraints); + this(Consts.getHostName(), object, constraints); } public RemoteVar(String host, String object) { @@ -50,7 +50,7 @@ public RemoteVar(String host, String object) { } public RemoteVar(String object) { - this(Consts.hostName, object); + this(Consts.getHostName(), object); } public final synchronized T get() { @@ -85,9 +85,10 @@ private void processNextEvent() { private final void sendEventPacketToListeners(EventPacket evPkt) { if (!consumers.isEmpty()) { final Set satConsumers = // - consumers.entrySet().stream().filter(e -> e.getValue().stream().allMatch(constr -> constr.test(val)))// - .map(e -> e.getKey())// - .collect(Collectors.toSet()); + consumers.entrySet().stream() + .filter(e -> e.getValue().stream().allMatch(constr -> constr.test(val)))// + .map(e -> e.getKey())// + .collect(Collectors.toSet()); pendingAcks = satConsumers.size(); satConsumers.forEach(c -> c.updateFromProducer(evPkt, this)); diff --git a/Dream2/src/main/java/dream/client/Signal.java b/Dream2/src/main/java/dream/client/Signal.java index 02f7618..c023a28 100755 --- a/Dream2/src/main/java/dream/client/Signal.java +++ b/Dream2/src/main/java/dream/client/Signal.java @@ -21,6 +21,7 @@ import dream.common.packets.content.Advertisement; import dream.common.packets.content.Event; import dream.common.packets.content.Subscription; +import dream.eval.utils.EvalUtils; public class Signal implements TimeChangingValue, UpdateProducer, UpdateConsumer { @@ -41,12 +42,17 @@ public class Signal implements TimeChangingValue, Upd private T val; + // Variables for delay measurement + int updateCount = 0; + double sumOfDelays = 0; + private final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); @SuppressWarnings({ "unchecked", "rawtypes" }) - public Signal(String object, Supplier evaluation, UpdateProducer... prods) { - this.host = Consts.hostName; + public Signal(String object, Supplier evaluation, T defaultVal, UpdateProducer... prods) { + this.host = Consts.getHostName(); this.object = object; + this.val = defaultVal; this.evaluation = evaluation; final Set> subs = new HashSet<>(); @@ -56,13 +62,33 @@ public Signal(String object, Supplier evaluation, UpdateProducer... prods) } clientEventForwarder = ClientEventForwarder.get(); - clientEventForwarder.advertise(new Advertisement(Consts.hostName, object), subs, true); + clientEventForwarder.advertise(new Advertisement(Consts.getHostName(), object), subs, true); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Signal(String object, Supplier evaluation, T defaultVal, List> prods) { + this.host = Consts.getHostName(); + this.object = object; + this.val = defaultVal; + this.evaluation = evaluation; + + final Set> subs = new HashSet<>(); + for (final UpdateProducer prod : prods) { + prod.registerUpdateConsumer(this, prod.getConstraints()); + subs.add(new Subscription(prod.getHost(), prod.getObject(), prod.getConstraints())); + } + + clientEventForwarder = ClientEventForwarder.get(); + clientEventForwarder.advertise(new Advertisement(Consts.getHostName(), object), subs, true); } private final synchronized void processNextUpdate() { if (pendingAcks == 0) { // Notify that the previous update has finished if (!waitingProducers.isEmpty()) { + // Set> cache = new + // HashSet<>(waitingProducers); + // cache.forEach(prod -> prod.notifyUpdateFinished()); waitingProducers.forEach(prod -> prod.notifyUpdateFinished()); waitingProducers.clear(); } @@ -75,7 +101,8 @@ private final synchronized void processNextUpdate() { private final void processUpdate(EventProducerPair update) { logger.finest("processTask method invoked with " + update); - final List pairs = queueManager.processEventPacket(update, object + "@" + Consts.hostName); + final List pairs = queueManager.processEventPacket(update, + object + "@" + Consts.getHostName()); logger.finest("The queueManager returned the following pairs " + pairs); if (!pairs.isEmpty()) { @@ -83,29 +110,48 @@ private final void processUpdate(EventProducerPair update) { // Extract information from any of the packets final EventPacket anyPkt = pairs.stream().findAny().get().getEventPacket(); + // Extract the oldest timestamp + long timestamp = pairs.stream().min((a, b) -> { + final long aTime = a.getEventPacket().getGenerationTime(); + final long bTime = b.getEventPacket().getGenerationTime(); + if (aTime == bTime) { + return 0; + } else if (aTime < bTime) { + return -1; + } else { + return 1; + } + }).get().getEventPacket().getGenerationTime(); + // Compute the new value try { val = evaluate(); + if (Consts.enableEvaluation) { + updateCount++; + double delay = (System.nanoTime() - timestamp) / 1000000.0; + sumOfDelays += delay; + EvalUtils.saveDelayToFile(updateCount, sumOfDelays); + } logger.finest("New value computed for the reactive object: " + val); } catch (final Exception e) { - logger.log(Level.INFO, - "Exception during the evaluation of the expression. Acknowledging the producers, releasing the locks, and returning.", - e); - pairs.forEach(pair -> pair.getUpdateProducer().notifyUpdateFinished()); - // Release locks, if needed - if ((Consts.consistencyType == ConsistencyType.COMPLETE_GLITCH_FREE || // - Consts.consistencyType == ConsistencyType.ATOMIC) && // - anyPkt.getLockReleaseNodes().contains(object + "@" + host)) { - clientEventForwarder.sendLockRelease(anyPkt.getId()); - } - return; + logger.log(Level.INFO, "Exception during the evaluation of the expression.", e); + // pairs.forEach(pair -> + // pair.getUpdateProducer().notifyUpdateFinished()); + // // Release locks, if needed + // if ((Consts.consistencyType == + // ConsistencyType.COMPLETE_GLITCH_FREE || // + // Consts.consistencyType == ConsistencyType.ATOMIC) && // + // anyPkt.getLockReleaseNodes().contains(object + "@" + host)) { + // clientEventForwarder.sendLockRelease(anyPkt.getId()); + // } + // return; } // Notify local and remote dependent objects logger.finest("Sending event to dependent objects."); - final Event event = new Event(Consts.hostName, object, val); + final Event event = new Event(Consts.getHostName(), object, val); // Notify remote subscribers - clientEventForwarder.sendEvent(anyPkt.getId(), event, anyPkt.getSource()); + clientEventForwarder.sendEvent(anyPkt.getId(), event, anyPkt.getSource(), timestamp); final Set satConsumers = // consumers.entrySet().stream() @@ -115,7 +161,7 @@ private final void processUpdate(EventProducerPair update) { // Notify local subscribers if (!satConsumers.isEmpty()) { pairs.forEach(pair -> waitingProducers.add(pair.getUpdateProducer())); - final EventPacket newEvPkt = new EventPacket(event, anyPkt.getId(), anyPkt.getSource()); + final EventPacket newEvPkt = new EventPacket(event, anyPkt.getId(), anyPkt.getSource(), timestamp); newEvPkt.setLockReleaseNodes(anyPkt.getLockReleaseNodes()); pendingAcks = satConsumers.size(); satConsumers.forEach(c -> c.updateFromProducer(newEvPkt, this)); @@ -157,9 +203,9 @@ public UpdateProducer filter(SerializablePredicate constraint) { public final synchronized void updateFromProducer(EventPacket packet, UpdateProducer producer) { final EventProducerPair pair = new EventProducerPair(packet, producer); eventQueue.add(pair); - logger.finest("Method update called for event " + pair + ". Added to the queue1."); + logger.finest("Method update called for event " + pair + ". Added to the queue."); if (eventQueue.size() == 1) { - logger.finest("The element is the only one in the queue1. Let's process it."); + logger.finest("The element is the only one in the queue. Let's process it."); processNextUpdate(); } } diff --git a/Dream2/src/main/java/dream/client/Var.java b/Dream2/src/main/java/dream/client/Var.java index 7f840d7..9d9a691 100755 --- a/Dream2/src/main/java/dream/client/Var.java +++ b/Dream2/src/main/java/dream/client/Var.java @@ -29,29 +29,31 @@ public class Var implements UpdateProducer, LockAppli private final List> constraints = new ArrayList<>(); private final Map>> consumers = new HashMap<>(); - private final Queue waitingModifications = new ArrayDeque<>(); + private final Queue waitingModifications = new ArrayDeque<>(); private int pendingAcks = 0; private T val; public Var(String object, T val) { this.forwarder = ClientEventForwarder.get(); - this.host = Consts.hostName; + this.host = Consts.getHostName(); this.object = object; this.val = val; - forwarder.advertise(new Advertisement(Consts.hostName, object), true); + forwarder.advertise(new Advertisement(Consts.getHostName(), object), true); } public final synchronized void set(T val) { + final long timestamp = System.nanoTime(); final Supplier supplier = () -> val; - waitingModifications.add(supplier); + waitingModifications.add(new TimestampedModification(supplier, timestamp)); if (waitingModifications.size() == 1) { tryToProcessNextUpdate(); } } public final synchronized void modify(Consumer modification) { - waitingModifications.add(modification); + final long timestamp = System.nanoTime(); + waitingModifications.add(new TimestampedModification(modification, timestamp)); if (waitingModifications.size() == 1) { tryToProcessNextUpdate(); } @@ -81,7 +83,10 @@ private final void tryToProcessNextUpdate() { } private final void processNextUpdate(UUID eventId) { - final Object mod = waitingModifications.poll(); + final TimestampedModification tMod = waitingModifications.poll(); + final Object mod = tMod.getModification(); + final long timestamp = tMod.getTimestamp(); + // Apply modification if (mod instanceof Consumer) { @SuppressWarnings("unchecked") @@ -94,20 +99,20 @@ private final void processNextUpdate(UUID eventId) { } // Propagate modification to local and remote subscribers - final Event ev = new Event<>(Consts.hostName, object, val); + final Event ev = new Event<>(Consts.getHostName(), object, val); final String source = ev.getSignature(); - final EventPacket packet = new EventPacket(ev, eventId, source); + final EventPacket packet = new EventPacket(ev, eventId, source, System.nanoTime()); packet.setLockReleaseNodes(forwarder.getLockReleaseNodesFor(source)); final Set satConsumers = // - consumers.entrySet().stream().filter(e -> e.getValue().stream().allMatch(constr -> constr.test(val)))// - .map(e -> e.getKey())// - .collect(Collectors.toSet()); + consumers.entrySet().stream().filter(e -> e.getValue().stream().allMatch(constr -> constr.test(val)))// + .map(e -> e.getKey())// + .collect(Collectors.toSet()); pendingAcks = satConsumers.size(); satConsumers.forEach(c -> c.updateFromProducer(packet, this)); - forwarder.sendEvent(eventId, ev, ev.getSignature()); + forwarder.sendEvent(eventId, ev, ev.getSignature(), timestamp); } @Override @@ -153,4 +158,24 @@ public void notifyLockGranted(LockGrantPacket lockGrant) { processNextUpdate(lockGrant.getLockID()); } + private class TimestampedModification { + private final Object modification; + private final long timestamp; + + TimestampedModification(Object modification, long timestamp) { + super(); + this.modification = modification; + this.timestamp = timestamp; + } + + final Object getModification() { + return modification; + } + + final long getTimestamp() { + return timestamp; + } + + } + } diff --git a/Dream2/src/main/java/dream/common/ConsistencyType.java b/Dream2/src/main/java/dream/common/ConsistencyType.java index 470691d..5fcc488 100755 --- a/Dream2/src/main/java/dream/common/ConsistencyType.java +++ b/Dream2/src/main/java/dream/common/ConsistencyType.java @@ -10,13 +10,13 @@ public final String toString() { SINGLE_SOURCE_GLITCH_FREE { @Override public final String toString() { - return "Single source glitch free"; + return "Single_source_glitch_free"; } }, COMPLETE_GLITCH_FREE { @Override public final String toString() { - return "Complete glitch free"; + return "Complete_glitch_free"; } }, ATOMIC { diff --git a/Dream2/src/main/java/dream/common/Consts.java b/Dream2/src/main/java/dream/common/Consts.java index dd81aef..d0f50f5 100755 --- a/Dream2/src/main/java/dream/common/Consts.java +++ b/Dream2/src/main/java/dream/common/Consts.java @@ -12,14 +12,18 @@ public final class Consts { private static final Properties properties = new Properties(); private static final String LOGGING_PROPERTIES_FILE_NAME = "logging.properties"; - public static final String serverAddr; - public static final String lockManagerAddr; + public static String serverAddr; + public static String lockManagerAddr; - public static final int serverPort; - public static final int lockManagerPort; + public static int serverPort; + public static int lockManagerPort; public static ConsistencyType consistencyType; - public static String hostName; + private static String hostName; + + public static boolean enableEvaluation; + public static String trafficMeasurementFile = "./traffic.txt"; + public static String delayMeasurementFile = "./delay.txt"; static { /** @@ -56,9 +60,6 @@ public final class Consts { lockManagerPort = Integer.parseInt(lockManagerPortProperty); lockManagerAddr = "reds-tcp:" + lockManagerAddrProperty + ":" + lockManagerPort; - final String hostNameProperty = properties.getProperty("hostName", "local"); - hostName = hostNameProperty; - final String consistencyTypeProperty = properties.getProperty("consistencyType", "single_glitch_free") .toLowerCase(); if (consistencyTypeProperty.equals("causal")) { @@ -73,5 +74,22 @@ public final class Consts { logger.warning("Unknown consistency type. Using single source glitch free as default."); } + final String hostNameProperty = properties.getProperty("hostName", "local"); + hostName = hostNameProperty; + trafficMeasurementFile = "./" + hostName + "_" + consistencyType + "_traffic.txt"; + delayMeasurementFile = "./" + hostName + "_" + consistencyType + "_delay.txt"; + + final String enableEvaluationProperty = properties.getProperty("enableEvaluation", "false"); + enableEvaluation = Boolean.valueOf(enableEvaluationProperty); + } + + public static final void setHostName(String name) { + hostName = name; + trafficMeasurementFile = "./" + hostName + "_" + consistencyType + "_traffic.txt"; + delayMeasurementFile = "./" + hostName + "_" + consistencyType + "_delay.txt"; + } + + public static final String getHostName() { + return hostName; } } diff --git a/Dream2/src/main/java/dream/common/packets/EventPacket.java b/Dream2/src/main/java/dream/common/packets/EventPacket.java index 6a13a02..5bdef99 100755 --- a/Dream2/src/main/java/dream/common/packets/EventPacket.java +++ b/Dream2/src/main/java/dream/common/packets/EventPacket.java @@ -25,13 +25,17 @@ public class EventPacket implements Serializable { // Original source of the change private final String source; + // Time (in nanosecond) when the message is generated + private final long generationTime; + // Nodes that should release the lock for the propagation, if any private final Set lockReleaseNodes = new HashSet<>(); - public EventPacket(Event event, UUID id, String source) { + public EventPacket(Event event, UUID id, String source, long generationTime) { this.event = event; this.id = id; this.source = source; + this.generationTime = generationTime; } public final Event getEvent() { @@ -46,6 +50,10 @@ public final String getSource() { return source; } + public final long getGenerationTime() { + return generationTime; + } + public final void setLockReleaseNodes(Set lockReleaseNodes) { this.lockReleaseNodes.addAll(lockReleaseNodes); } diff --git a/Dream2/src/main/java/dream/locking/LockManagerForwarder.java b/Dream2/src/main/java/dream/locking/LockManagerForwarder.java index d5a04cb..c029768 100644 --- a/Dream2/src/main/java/dream/locking/LockManagerForwarder.java +++ b/Dream2/src/main/java/dream/locking/LockManagerForwarder.java @@ -3,12 +3,16 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.logging.Logger; +import dream.common.Consts; import dream.common.packets.locking.LockGrantPacket; import dream.common.packets.locking.LockReleasePacket; import dream.common.packets.locking.LockRequestPacket; +import dream.eval.utils.EvalUtils; import polimi.reds.NodeDescriptor; import polimi.reds.broker.routing.Outbox; import polimi.reds.broker.routing.PacketForwarder; @@ -17,9 +21,16 @@ public class LockManagerForwarder implements PacketForwarder { private final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); private final LockManager lockManager = new LockManager(); + private final Map trafficPkts = new HashMap<>(); + private final Map trafficBytes = new HashMap<>(); + @Override public final Collection forwardPacket(String subject, NodeDescriptor sender, Serializable packet, Collection neighbors, Outbox outbox) { + if (Consts.enableEvaluation) { + EvalUtils.updateTraffic(packet, subject, trafficPkts, trafficBytes); + EvalUtils.saveTrafficToFile(trafficPkts, trafficBytes); + } if (subject.equals(LockRequestPacket.subject)) { assert packet instanceof LockRequestPacket; final LockRequestPacket reqPkt = (LockRequestPacket) packet; diff --git a/Dream2/src/main/java/dream/server/ServerEventForwarder.java b/Dream2/src/main/java/dream/server/ServerEventForwarder.java index f75ed9c..2e320a7 100755 --- a/Dream2/src/main/java/dream/server/ServerEventForwarder.java +++ b/Dream2/src/main/java/dream/server/ServerEventForwarder.java @@ -4,14 +4,18 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.logging.Logger; +import dream.common.Consts; import dream.common.packets.AdvertisementPacket; import dream.common.packets.EventPacket; import dream.common.packets.SubscriptionPacket; +import dream.eval.utils.EvalUtils; import polimi.reds.NodeDescriptor; import polimi.reds.broker.overlay.NeighborhoodChangeListener; import polimi.reds.broker.overlay.NotRunningException; @@ -26,9 +30,12 @@ public class ServerEventForwarder implements PacketForwarder, NeighborhoodChange protected final SubscriptionTable brokersSubTable = new SubscriptionTable(); protected final AdvertisementTable advTable = new AdvertisementTable(); - private final Set allValidAdvertisements = new HashSet<>(); + private final Set allValidAdvertisements = Collections.synchronizedSet(new HashSet<>()); private final Overlay overlay; + private final Map trafficPkts = new HashMap<>(); + private final Map trafficBytes = new HashMap<>(); + public ServerEventForwarder(final Overlay overlay) { this.overlay = overlay; } @@ -36,6 +43,10 @@ public ServerEventForwarder(final Overlay overlay) { @Override public Collection forwardPacket(String subject, NodeDescriptor sender, Serializable packet, Collection neighbors, Outbox outbox) { + if (Consts.enableEvaluation) { + EvalUtils.updateTraffic(packet, subject, trafficPkts, trafficBytes); + EvalUtils.saveTrafficToFile(trafficPkts, trafficBytes); + } if (subject.equals(SubscriptionPacket.subject)) { assert packet instanceof SubscriptionPacket; final SubscriptionPacket subPkt = (SubscriptionPacket) packet; diff --git a/Dream2/src/test/java/dream/LocalTest.java b/Dream2/src/test/java/dream/LocalTest.java index bdd6b03..242eb83 100755 --- a/Dream2/src/test/java/dream/LocalTest.java +++ b/Dream2/src/test/java/dream/LocalTest.java @@ -26,18 +26,18 @@ public void localTest1() { final Var varString2 = new Var<>("varString2", ""); final Signal signalInt = new Signal("signalInt", - () -> 10 - 2 + (varInt.get() * 2 + varInt.get()) / 2, varInt); + () -> 10 - 2 + (varInt.get() * 2 + varInt.get()) / 2, 1, varInt); final Signal signalString = new Signal("signalString", - () -> varString1.get() + varString2.get(), varString1, varString2); - final Signal signalInt2 = new Signal("signalInt2", () -> signalInt.get() * 2, signalInt); + () -> varString1.get() + varString2.get(), "", varString1, varString2); + final Signal signalInt2 = new Signal("signalInt2", () -> signalInt.get() * 2, 1, signalInt); final Var varStart = new Var<>("varStart", Integer.valueOf(1)); - final Signal signalMid1 = new Signal("signalMid1", () -> varStart.get() * 2, varStart); - final Signal signalMid2 = new Signal("signalMid2", () -> signalMid1.get() * 2, signalMid1); + final Signal signalMid1 = new Signal("signalMid1", () -> varStart.get() * 2, 1, varStart); + final Signal signalMid2 = new Signal("signalMid2", () -> signalMid1.get() * 2, 1, signalMid1); final Signal signalFinal = new Signal("signalFinal", - () -> signalMid1.get() + signalMid2.get(), signalMid1, signalMid2); + () -> signalMid1.get() + signalMid2.get(), 1, signalMid1, signalMid2); final Signal signalFinal2 = new Signal("signalFinal2", - () -> signalMid1.get() + varStart.get(), signalMid1, varStart); + () -> signalMid1.get() + varStart.get(), 1, signalMid1, varStart); try { Thread.sleep(500); diff --git a/Dream2/src/test/java/dream/UnknownVar.java b/Dream2/src/test/java/dream/UnknownVar.java index 7dd1a37..f20d1c4 100644 --- a/Dream2/src/test/java/dream/UnknownVar.java +++ b/Dream2/src/test/java/dream/UnknownVar.java @@ -27,7 +27,7 @@ public void testUnknownVar() { return ""; else return listener.get(); - } , listener); + }, "", listener); listenerSignal.change().addHandler((o, n) -> System.out.println(o + "->" + n)); }